了解pthread锁和条件变量

我在C中进行了有关线程,锁和条件变量的练习。我需要编写一个获取数据的程序,将其转换为链接列表,并开始3个线程,每个线程为列表和主线程中的每个节点计算结果在evreyone完成后打印结果。

这是主要功能:

int thread_finished_count;

// Lock and Conditional variable
pthread_mutex_t list_lock;
pthread_mutex_t thread_lock;
pthread_cond_t thread_cv;

int main(int argc,char const *argv[])
{
    node *list;
    int pairs_count,status;
    thread_finished_count = 0;

    /* get the data and start the threads */
    node *head = create_numbers(argc,argv,&pairs_count);
    list = head; // backup head for results
    pthread_t *threads = start_threads(&list);

    /* wait for threads and destroy lock */
    status = pthread_cond_wait(&thread_cv,&list_lock);
    chcek_status(status);
    status = pthread_mutex_destroy(&list_lock);
    chcek_status(status);
    status = pthread_mutex_destroy(&thread_lock);
    chcek_status(status);

    /* print result in original list */
    print_results(head);

    /* cleanup */
    wait_for_threads(threads,NUM_THREADS);
    free_list(head);
    free(threads);

    return EXIT_SUCCESS;
}

请注意,create_numbers函数正常运行,并且列表按预期运行。

这是start_thread和thread_function代码:

pthread_t *start_threads(node **list)
{
    int status;
    pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * NUM_THREADS);
    check_malloc(threads);

    for (int i = 0; i < NUM_THREADS; i++)
    {
        status = pthread_create(&threads[i],NULL,thread_function,list);
        chcek_status(status);
    }
    return threads;
}

void *thread_function(node **list)
{
    int status,self_id = pthread_self();
    printf("im in %u\n",self_id);
    node *currentNode;

    while (1)
    {
        if (!(*list))
            break;
        status = pthread_mutex_lock(&list_lock);
        chcek_status(status);
        printf("list location %p thread %u\n",*list,self_id);
        if (!(*list))
        {
            status = pthread_mutex_unlock(&list_lock);
            chcek_status(status);
            break;
        }
        currentNode = (*list);
        (*list) = (*list)->next;
        status = pthread_mutex_unlock(&list_lock);
        chcek_status(status);
        currentNode->gcd = gcd(currentNode->num1,currentNode->num2);
        status = usleep(10);
        chcek_status(status);
    }
    status = pthread_mutex_lock(&thread_lock);
    chcek_status(status);
    thread_finished_count++;
    status = pthread_mutex_unlock(&thread_lock);
    chcek_status(status);
    if (thread_finished_count != 3)
        return NULL;
    status = pthread_cond_signal(&thread_cv);
    chcek_status(status);
    return NULL;
}
void chcek_status(int status)
{
    if (status != 0)
    {
        fputs("pthread_function() error\n",stderr);
        exit(EXIT_FAILURE);
    }
}

请注意,self_id用于调试目的。

我的问题

  1. 我的主要问题是关于分工。因此,每个线程都从全局链接列表中获取一个元素,计算gcd,然后继续获取下一个元素。仅当在while循环中解锁互斥锁后添加usleep(10)时,才能获得此效果。如果我不添加usleep,则FIRST线程将进入并执行所有工作,而其他线程将在所有工作完成后等待并进入。

请注意!:我考虑了可能创建第一个线程的选择,直到创建第二个线程为止,第一个线程已经完成了所有作业。这就是为什么在创建evrey线程时使用usleep(10)添加“我在#threadID中”检查的原因。他们都进来了,但只有第一个工作在做所有工作。 这是输出示例,如果我在互斥体解锁(注意不同的线程ID)后做usleep

睡着了

./v2 nums.txt
im in 1333593856
list location 0x7fffc4fb56a0 thread 1333593856
im in 1316685568
im in 1325139712
list location 0x7fffc4fb56c0 thread 1333593856
list location 0x7fffc4fb56e0 thread 1316685568
list location 0x7fffc4fb5700 thread 1325139712
list location 0x7fffc4fb5720 thread 1333593856
list location 0x7fffc4fb5740 thread 1316685568
list location 0x7fffc4fb5760 thread 1325139712
list location 0x7fffc4fb5780 thread 1333593856
list location 0x7fffc4fb57a0 thread 1316685568
list location 0x7fffc4fb57c0 thread 1325139712
list location 0x7fffc4fb57e0 thread 1333593856
list location 0x7fffc4fb5800 thread 1316685568
list location (nil) thread 1325139712
list location (nil) thread 1333593856
...
normal result output
...

如果我在互斥锁锁定后注释掉usleep,那就输出了(注意相同的线程ID) 无睡眠

  ./v2 nums.txt
im in 2631730944
list location 0x7fffe5b946a0 thread 2631730944
list location 0x7fffe5b946c0 thread 2631730944
list location 0x7fffe5b946e0 thread 2631730944
list location 0x7fffe5b94700 thread 2631730944
list location 0x7fffe5b94720 thread 2631730944
list location 0x7fffe5b94740 thread 2631730944
list location 0x7fffe5b94760 thread 2631730944
list location 0x7fffe5b94780 thread 2631730944
list location 0x7fffe5b947a0 thread 2631730944
list location 0x7fffe5b947c0 thread 2631730944
list location 0x7fffe5b947e0 thread 2631730944
list location 0x7fffe5b94800 thread 2631730944
im in 2623276800
im in 2614822656
...
normal result output
...
  1. 我的第二个问题是关于线程工作的顺序。我的练习要求我不要使用join来同步线程(仅在末尾使用以“释放资源”),而是要理解使用该条件变量。

我的目标是每个线程将获取元素,进行计算,同时另一个线程将进入并获取另一个元素,新线程将获取每个元素(或至少接近该元素)

感谢您的阅读,请多多包涵。

iCMS 回答:了解pthread锁和条件变量

首先,您在按住锁的同时执行gcd()工作...因此(a)在任何时候,只有一个线程可以完成任何工作,尽管( b)并不能完全解释为什么只有一个线程似乎可以完成(几乎)所有工作-正如KamilCuk所说的那样,可能要做的工作很少,几乎所有工作都已经完成了。第二个线程正常唤醒。 [更奇特的是,线程'a'解锁互斥锁和另一个线程开始运行之间可能会有一些延迟,因此线程'a'可以在另一个线程到达那里之前获取互斥锁。]

POSIX说,当一个互斥锁被解锁时,如果有服务员,那么“调度策略将确定哪个线程将获取该互斥锁”。据我所知,默认的“计划策略”是定义的实现。

您可以尝试以下几种方法:(1)使用pthread_barrier_t将所有线程保留在thread_function()的开头,直到所有线程都在运行; (2)sched_yield(void)之后使用pthread_mutex_unlock()来提示系统运行新运行的线程。

第二,在任何情况下都不应将“条件变量”视为信号。为了使main()知道所有线程都已完成,您需要计数-可能是pthread_barrier_t;或者它可以是一个简单的整数,受互斥锁保护,并带有“条件变量”以使主线程在等待时保持打开状态;或者或者它可以是一个计数(在main()中)和一个信号量(每个线程在退出时都会发布一次)。

第三,您在pthread_cond_wait(&cv,&lock);中显示main()。此时,main() 必须拥有lock ...这很重要。但是:就目前情况而言,即使其他线程仍在运行,找到第list个空的 first 线程也会踢cv,并且main()会继续进行。尽管main()确实重新获取了lock,但随后仍在运行的任何线程都将退出或停留在lock上。 (一团糟。)


通常,使用“条件变量”的模板为:

    pthread_mutex_lock(&...lock) ;

    while (!(... thing we need ...))
      pthread_cond_wait(&...cond_var,&...lock) ;

    ... do stuff now we have what we need ....

    pthread_mutex_unlock(&...lock) ;

NB:“条件变量”没有值...尽管有名称,但它不是 not 标志,表示某些条件为真。本质上,“条件变量”是等待重新启动的线程队列。当发出“条件变量”的信号时,将至少重新启动一个等待线程-但是,如果没有线程在等待,则什么都不会发生,特别是(所谓的)“条件变量”保留信号的无记忆

在新代码中,遵循上述模板,main()应该:

    /* wait for threads .... */

    status = pthread_mutex_lock(&thread_lock);
    chcek_status(status);

    while (thread_finished_count != 3)
      {
        pthread_cond_wait(&thread_cv,&thread_lock) ;
        chcek_status(status);
      } ;

    status = pthread_mutex_unlock(&thread_lock) ;
    chcek_status(status);

那么这是怎么回事?

  1. main()正在等待thread_finished_count == 3

  2. thread_finished_count是受thread_lock互斥锁“保护”的共享变量。

    ...因此它在互斥量下的thread_function()中递增。

    ...和main()也必须在互斥锁下读取它。

  3. 如果main()找到thread_finished_count != 3,则必须等待。

    要这样做:pthread_cond_wait(&thread_cv,&thread_lock),其中:

    • 解锁thread_lock

    • 将线程放在等待线程的thread_cv队列中。

    ,并且原子地

  4. thread_function()执行pthread_cond_signal(&thread_cv)时,它将唤醒等待的线程。

  5. main()线程唤醒时,它将首先重新获取thread_lock ...

    ...因此它可以继续重新读取thread_finished_count,以查看现在是否为3

FWIW:我建议不要销毁互斥对象,直到之后所有线程都已加入。

,

我已经深入研究了glibc(至少在Linux和x86_64上为v2.30)如何实现pthread_mutex_lock()_unlock()

事实证明_lock()的工作方式如下:

  if (atomic_cmp_xchg(mutex->lock,1))
    return <OK> ;             // mutex->lock was 0,is now 1

  while (1)
    {
      if (atomic_xchg(mutex->lock,2) == 0)
        return <OK> ;        // mutex->lock was 0,is now 2

      ...do FUTEX_WAIT(2)... // suspend thread iff mutex->lock == 2...
    } ;

_unlock()的工作原理如下:

  if (atomic_xchg(mutex->lock,0) == 2)  // set mutex->lock == 0
    ...do FUTEX_WAKE(1)...               // if may have waiter(s) start 1

现在:

  • mutex->lock:0 =>已解锁,1 =>锁定但没有服务员,2 =>锁定了服务员

    针对没有锁争用且无需在FUTEX_WAKE中执行_unlock()的情况进行优化的“锁定但没有等待者”。

  • 函数_lock() / _unlock()在库中-在内核中不是

    ...尤其是互斥锁的所有权是库的事,不是内核的事。

  • FUTEX_WAIT(2)是对内核的调用,除非mutex->lock != 2,否则它将把线程放置在与互斥锁关联的挂起队列中。

    内核检查mutex->lock == 2并将线程从原子上添加到队列。这涉及在_unlock()之后调用atomic_xchg(mutex->lock,2)的情况。

  • FUTEX_WAKE(1)也是对内核的调用,futex手册页告诉我们:

    FUTEX_WAKE(从Linux 2.6.0开始)

    此操作最多唤醒正在等待的服务员的'val' ...无法保证唤醒哪些服务员(例如,不能保证调度优先级较高的服务员优先于优先级较低的服务生被唤醒。

    其中'val'在这种情况下为1。

    尽管文档说“不能保证唤醒哪些侍者”,但队列似乎至少是FIFO。

请特别注意:

  1. _unlock()不会将互斥锁传递给FUTEX_WAKE启动的线程。

  2. 一旦唤醒,线程将再次尝试获取锁...

    ...但是可能被其他任何正在运行的线程击败-包括刚刚执行_unlock()的线程。

我相信这就是为什么您没有看到跨线程共享工作的原因。每个人要做的工作很少,以至于一个线程可以解锁互斥锁,执行该工作然后返回以再次锁定该互斥锁之前被解锁唤醒的线程可以继续并成功锁定互斥锁。

本文链接:https://www.f2er.com/2270963.html

大家都在问