最新消息:

比特币源码剖析(七)之任务调度

Bitcoin justnode 348浏览

比特币中的任务调度,是个简单的生产者消费者模型。该模型通过一个条件变量,一个互斥锁以及一个消息队列来实现。

首先我们通过一个demo来学习条件变量和互斥锁的使用:

boost::mutex mutex;
boost::condition_variable cv;
std::string data;
bool ready = false;  // 条件
bool processed = false;  // 条件
 
void Worker() {
    boost::unique_lock<boost::mutex> lock(mutex);
 
    // 等待生产者线程发送数据。
    cv.wait(lock, [] { return ready; });
 
    // 等待后,继续拥有锁。
    std::cout << "消费者线程正在处理数据..." << std::endl;
    // 睡眠一秒以模拟数据处理。
    boost::this_thread::sleep_for(boost::chrono::seconds(1));
    data += " 已处理";
 
    // 把数据发回生产者线程。
    processed = true;
    std::cout << "消费者线程通知数据已经处理完毕。" << std::endl;
 
    // 通知前,手动解锁以防正在等待的线程被唤醒后又立即被阻塞。
    //lock.unlock();
 
    cv.notify_one();
}
 
int main() {
    boost::thread worker(Worker);
 
    // 把数据发送给消费者线程。
    {
        boost::lock_guard<boost::mutex> lock(mutex); 
        std::cout << "生产者线程正在准备数据..." << std::endl;
        // 睡眠一秒以模拟数据准备。
        boost::this_thread::sleep_for(boost::chrono::seconds(1));
        data = "样本数据";
        ready = true;
        std::cout << "生产者线程通知数据已经准备完毕。" << std::endl;
    }
    cv.notify_one();
 
    // 等待消费者线程处理数据。
    {
        boost::unique_lock<boost::mutex> lock(mutex); 
        cv.wait(lock, [] { return processed; });
    }
    std::cout << "回到生产者线程,数据 = " << data << std::endl;
 
    worker.join();
 
    return 0;
}

以上代码中,消费者线程会先阻塞,等待生产者线程生产数据,之后通过cv.notify_one()唤醒阻塞的消费者线程。

比特币源码中的实现跟上面代码类似,但又有所不同。源码中定义了一个CScheduler类,该类中的成员变量taskQueue是multimap类型的任务队列,成员方法scheduleEvery,scheduleFromNow,schedule都是向taskQueue添加任务。与此同时,比特币在初始化的时候,创建了一个线程,这个线程专门负责从taskQueue中读取任务,然后执行。

在AppInit2中创建了一个线程,执行serviceLoop函数。

    // Start the lightweight task scheduler thread
    CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
    threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));

成员方法scheduleEvery,scheduleFromNow都是通过调用schedule实现了向taskQueue中添加任务,下面我们分析schedule的实现



void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
{
    {
        boost::unique_lock<boost::mutex> lock(newTaskMutex);
        taskQueue.insert(std::make_pair(t, f));
    }
    newTaskScheduled.notify_one();
}

在向taskQueue添加任务后,会调用newTaskScheduled.notify_one()唤醒任务队列中阻塞的一项任务。