Pthread:相对于线程数,增加程序执行时间

我正在尝试使用pthreads C构建高效的并发哈希图。

以下是我的实现

#include <stdlib.h>
#include <stddef.h>
#include <pthread.h>
#include <stdint.h>
#include <limits.h>
#include <stdio.h>
#include <linux/limits.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

#define ENTRIES_PER_BUCKET 3

struct Bucket
{
    pthread_mutex_t mutex;
    void **keys;
    int *vals;
    struct Bucket *next;
};

struct Concurrent_Map
{
    struct Bucket *buckets;
    map_keys_equality *keys_eq;
    map_key_hash *khash;
    int capacity;
};

int concurrent_map_allocate /*@ <t> @*/ (map_keys_equality *keq,map_key_hash *khash,unsigned capacity,struct Concurrent_Map **map_out)

{

    struct Concurrent_Map *old_map_val = *map_out;
    struct Concurrent_Map *map_alloc = malloc(sizeof(struct Concurrent_Map));
    if (map_alloc == NULL)
    {
        return 0;
    }
    *map_out = (struct Concurrent_Map *)map_alloc;

    struct Bucket *buckets_alloc = (struct Bucket *)malloc(sizeof(struct Bucket) * (int)capacity);

    if (buckets_alloc == NULL)
    {
        free(map_alloc);
        *map_out = old_map_val;
        return 0;
    }
    (*map_out)->buckets = buckets_alloc;
    (*map_out)->capacity = capacity;
    (*map_out)->keys_eq = keq;
    (*map_out)->khash = khash;

    unsigned i;

    for (i = 0; i < capacity; i++)
    {
        if (pthread_mutex_init(&((*map_out)->buckets[i].mutex),NULL) == 0)
        {
            void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));

            if (key_alloc != NULL)
            {
                (*map_out)->buckets[i].keys = key_alloc;

                int k;
                for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                {

                    (*map_out)->buckets[i].keys[k] = NULL;
                }
            }

            int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));

            if (vals_alloc != NULL)
            {
                (*map_out)->buckets[i].vals = vals_alloc;

                int k;
                for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                {
                    (*map_out)->buckets[i].vals[k] = -1;
                }
            }

            (*map_out)->buckets[i].next = NULL;
        }
    }

    // todo exceptions in allocation

    return 1;
}

static unsigned loop(unsigned k,unsigned capacity)
{
    unsigned g = k % capacity;

    unsigned res = (g + capacity) % capacity;

    return res;
}

int concurrent_map_get(struct Concurrent_Map *map,void *key,int *value_out)

{
    map_key_hash *khash = map->khash;
    unsigned hash = khash(key);

    unsigned start = loop(hash,map->capacity);
    unsigned bucket_index = loop(start + 0,map->capacity);

    if (bucket_index < map->capacity)
    {

        struct Bucket *bucket = &(map->buckets[bucket_index]);

        pthread_mutex_t mutex = bucket->mutex;

        pthread_mutex_lock(&mutex);

        int j;
        do
        {
            for (j = 0; j < ENTRIES_PER_BUCKET; j++)
            {
                int val = bucket->vals[j];
                if (map->keys_eq(bucket->keys[j],key))
                {
                    if (bucket->vals[j] == val)
                    {
                        *value_out = val;
                        return 1;
                    }
                    else
                    {
                        *value_out = -1;
                        return 0;
                    }
                }
            }
            if (bucket->next != NULL)
            {
                bucket = (bucket->next);
            }
            else
            {
                break;
                pthread_mutex_unlock(&mutex);
            }

            pthread_mutex_unlock(&mutex);

        } while (1);
    }
    *value_out = -1;
    return 0;
}

int concurrent_map_put(struct Concurrent_Map *map,int value)

{
    map_key_hash *khash = map->khash;
    unsigned hash = khash(key);

    unsigned start = loop(hash,map->capacity);

    struct Bucket *bucket = &(map->buckets[bucket_index]);

    int j;

    do
    {

        pthread_mutex_t mutex = bucket->mutex;

        int j;

        pthread_mutex_lock(&mutex);

        for (j = 0; j < ENTRIES_PER_BUCKET; j++)
        {
            if (map->keys_eq(bucket->keys[j],key))
            {
                pthread_mutex_unlock(&mutex);
                return 0;
            }
            else if (bucket->keys[j] == NULL)
            {
                bucket->vals[j] = value;
                bucket->keys[j] = key;
                pthread_mutex_unlock(&mutex);
                return 1;
            }
        }
        if (bucket->next == NULL)

        {
            // allocate a new bucket

            struct Bucket *new_bucket = malloc(sizeof(struct Bucket));

            if (pthread_mutex_init(&(new_bucket->mutex),NULL) == 0)
            {
                void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));

                if (key_alloc != NULL)
                {
                    new_bucket->keys = key_alloc;

                    int k;
                    for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                    {
                        new_bucket->keys[k] = NULL;
                    }
                }

                int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));

                if (vals_alloc != NULL)
                {
                    new_bucket->vals = vals_alloc;

                    int k;
                    for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                    {
                        new_bucket->vals[k] = -1;
                    }
                }

                bucket->next = new_bucket;
            }
        }

        pthread_mutex_unlock(&mutex);
        bucket = bucket->next;

    } while (1);

    return 0;
}

int concurrent_map_erase(struct Concurrent_Map *map,void **trash)

{

    map_key_hash *khash = map->khash;
    unsigned hash = khash(key);

    unsigned start = loop(hash,key))
            {
                bucket->vals[j] = -1;
                bucket->keys[j] = NULL;
                pthread_mutex_unlock(&mutex);
                return 1;
            }
        }

        pthread_mutex_unlock(&mutex);
        if (bucket->next != NULL)
        {
            bucket = (bucket->next);
        }
        else
        {
            break;
        }

    } while (1);
    return 0;
}

int concurrent_map_size(struct Concurrent_Map *map)

{
    int num_buckets = 0;

    struct Bucket *buckets = map->buckets;
    unsigned i;

    for (i = 0; i < map->capacity; i++)
    {
        struct Bucket bucket = buckets[i];
        do
        {
            num_buckets++;
            if (bucket.next != NULL)
            {
                bucket = *(bucket.next);
            }
            else
            {
                break;
            }

        } while (1);
    }
    return num_buckets * ENTRIES_PER_BUCKET;
}
struct FlowId
{
    int src_port;
    int dst_port;
    int src_ip;
    int dst_ip;
    int internal_device;
    int protocol;
};

bool FlowId_eq(void *a,void *b)

{
    if (a == NULL || b == NULL)
    {
        return false;
    }
    struct FlowId *id1 = a;
    struct FlowId *id2 = b;

    return (id1->src_port == id2->src_port) && (id1->dst_port == id2->dst_port) && (id1->src_ip == id2->src_ip) && (id1->dst_ip == id2->dst_ip) && (id1->internal_device == id2->internal_device) && (id1->protocol == id2->protocol);
}

unsigned FlowId_hash(void *obj)

{
    struct FlowId *id = obj;
    unsigned hash = 0;
    hash = __builtin_ia32_crc32si(hash,id->src_port);
    hash = __builtin_ia32_crc32si(hash,id->dst_port);
    hash = __builtin_ia32_crc32si(hash,id->src_ip);
    hash = __builtin_ia32_crc32si(hash,id->dst_ip);
    hash = __builtin_ia32_crc32si(hash,id->internal_device);
    hash = __builtin_ia32_crc32si(hash,id->protocol);
    return hash;
}

struct Concurrent_Map *concurrent_map;

#define NUM_THREADS 2
#define NUM_PACKETS 10000000

void *expirator(void *arg)
{
    // printf("Thread started executing\n");
    unsigned i = 0;
    int error = 0;
    unsigned packet_count = NUM_PACKETS / NUM_THREADS;
    while (i < packet_count)
    {
        i++;
        struct FlowId *id = malloc(sizeof(struct FlowId));
        struct FlowId *id1 = malloc(sizeof(struct FlowId));
        id->dst_ip = 1;
        id->src_ip = 1;
        id->internal_device = 1;
        id->protocol = 1;
        id->src_port = 1;
        id->dst_port = rand() % 65536;

        id1->dst_ip = 1;
        id1->src_ip = 1;
        id1->internal_device = 1;
        id1->protocol = 1;
        id1->src_port = 1;
        id1->dst_port = rand() % 65536;

        int external_port = rand() % 65536;
        int external;

        concurrent_map_erase(concurrent_map,id,NULL);

        concurrent_map_put(concurrent_map,external_port);
        concurrent_map_get(concurrent_map,&external);

        if (external_port != external)
        {
            error++;
        }
        else
        {
        }
    }
    return NULL;
}

int main()
{

    clock_t begin = clock();

    concurrent_map_allocate(FlowId_eq,FlowId_hash,65536,&(concurrent_map));

    pthread_t *threads = malloc(sizeof(pthread_t) * NUM_THREADS);
    int i;
    for (i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_create(&threads[i],NULL,expirator,NULL) != 0)
        {
            printf("Error creating threads");
            exit(0);
        }
    }
    for (i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_join(threads[i],NULL) != 0)
        {
            printf("Error joining threads");
            exit(0);
        }
    }
    clock_t end = clock();
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
    printf("%lf\n",time_spent);
    return 0;
}

这是如何运行此程序。

gcc  concurrent_map.c  -o test-concurrent-new -lpthread -msse4.2 -O3

然后,我测量固定工作负载的执行时间,以下是我观察到的时间值。

1:3.29

2:6.668711

3:5.88

4:6.23

5:6.38

6:6.52

7:6.74

8:6.82

似乎随着线程数量的增加,执行时间也会增加,并且几乎保持不变。

我使用Mutrace剖析了此代码,该代码查找互斥锁争用。事实证明

  

没有互斥量根据过滤参数而竞争。

我检查了高速缓存未命中的数量,结果发现,修改线程数后,高速缓存未命中的数量大致相等。

为什么线程数增加时执行时间不会减少?

我正在32核计算机上运行

nidi_2 回答:Pthread:相对于线程数,增加程序执行时间

rand()通常不适合多线程执行。而是使用rand_r()。

还使用Linux时间工具对应用程序进行计时。

您的工作负载生成会带来巨大的开销,我认为这是瓶颈,而不是并发哈希图

本文链接:https://www.f2er.com/3124854.html

大家都在问