Java和计算中的线程

我是Java的新手,我正在尝试编写一个带有两个参数的程序:

  1. 我们必须对素数求和的数字
  2. 我们必须执行此操作的线程数

因此,我使用一种名为 Eratosthene 的方法,该方法存储一个 boolean 数组,如果数字是素数,则将其标记为true,然后标记所有的倍数这个数字是错误的。

我尝试将我的数组划分为每个线程的子数组,并在每个子数组中进行操作,最后将所有子数组的结果求和。

但是我不知道我在哪里做错了:有时程序不能给出好的结果。

这是我的代码:

SumPrime.java

db.collection.aggregate([
  {
    $sort: {
      // Sort by the book_version from highest to lowest 
      book_version: -1
    }
  },{
    $group: {
      // Group the documents by their book_id
      _id: "$book_id",book_version: {
        // The first book_version is the highest book version
        // The same thing applies to every other first-of other book fields 
        $first: "$book_version"
      },book_id: {
        $first: "$book_id"
      },author: {
        "$first": "$author"
      },name: {
        "$first": "$name"
      },comments: {
        "$first": "$comments"
      }
    }
  }
])

您可以这样运行程序:

ridership_year = ridership_mg.groupby(['stationame','year']).monthtotal.sum().reset_index()
sns.lmplot(x = 'year',y = 'monthtotal',col ='stationame',data= ridership_year,col_wrap = 5)

我愿意接受任何有关改进我的代码的建议。

cumtlhf 回答:Java和计算中的线程

您需要完全重新考虑线程的逻辑。

各个线程无法访问array的相同范围,例如如果线程具有min = 100max = 150,则只能使用和/或更改范围在100到149(含)之间的元素。

您的代码:

for (int i = min; i < max; i++) {
    if (array[i]) {
        for (int j = min; j*i < array.length; j++) {
            array[i*j] = false;

i = 100,j = 100开头,从而生成i*j = 10000。如果array真的那么大,则意味着您访问array[10000],但这是不允许的。当然,数组不是那么大,所以代码什么都不做

啊,您说,第一个线程有min = 0max = 50,所以它将更改从索引0(0 * 0)到2401(49 * 49)的值,并且由于数组是小于此值,它将更新整个数组,但是 不允许

现在,再考虑一下。

如果范围是min = 100,max = 150,则需要先清除该范围内的所有偶数,然后清除所有可被3整除的数字,再清除所有...,依此类推,但仅限于范围

我会让你重新考虑逻辑。


更新

要将Sieve of Eratosthenes应用于某个范围,我们需要质数一直到该范围的最大值的平方根。

如果范围是min = 150,max = 200,然后是maxPrime = sqrt(200) = 14,那么我们需要从2到14(含)的质数,那么我们可以将范围150-199更新。

假设我们首先更新array来找到2-14范围内的所有素数,我们可以使用它来迭代目标范围内(150-199)的那些素数的倍数 。为此,我们需要从素数的最低倍数> = min开始,因此我们需要将min舍入到prime的下一个倍数。

使用整数数学,到round up to next multiple,我们计算:

lower = (min + prime - 1) / prime * prime

这为我们提供了主要逻辑:

maxPrime = (int) Math.sqrt(max);
for (int prime = 2; prime <= maxPrime; prime++) {
    if (array[prime]) {
        int lower = (min + prime - 1) / prime * prime;
        for (int i = lower; i < max; i += prime)
            array[i] = false

我们还应该让每个线程负责首先设置范围内的所有布尔值,以便该部分也成为多线程。

现在,主逻辑必须首先在主线程中找到2-sqrt(N)范围内的质数,然后在线程之间分配剩余范围。

这是我的尝试:

public static long sumPrimes(int n,int threadCount) {
    // Find and sum the "seed" primes needed by the threads
    int maxSeedPrime = (int) Math.sqrt(n + 2); // extra to be sure no "float errors" occur
    boolean[] seedPrime = new boolean[maxSeedPrime + 1];
    AtomicLong totalSum = new AtomicLong(sumPrimes(seedPrime,seedPrime,maxSeedPrime));

    // Split remaining into ranges and start threads to calculate sums
    Thread[] threads = new Thread[threadCount];
    for (int t = 0,rangeMin = maxSeedPrime + 1; t < threadCount; t++) {
        int min = rangeMin;
        int max = min + (n - min + 1) / (threadCount - t) - 1;
        threads[t] = new Thread(() ->
            totalSum.addAndGet(sumPrimes(seedPrime,new boolean[max - min + 1],min,max))
        );
        threads[t].start();
        rangeMin = max + 1;
    }

    // Wait for threads to end
    for (int t = 0; t < threadCount; t++) {
        try {
            threads[t].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // Return the calculated sum
    return totalSum.get();
}
private static long sumPrimes(boolean[] seedPrime,boolean[] rangePrime,int min,int max/*inclusive*/) {
    // Initialize range
    for (int i = Math.max(min,2); i <= max; i++) {
        rangePrime[i - min] = true;
    }

    // Mark non-primes in range
    int maxPrime = (int) Math.sqrt(max + 1); // extra to be sure no "float errors" occur
    for (int prime = 2; prime <= maxPrime; prime++) {
        if (seedPrime[prime]) {
            int minMultiple = (min + prime - 1) / prime * prime;
            if (minMultiple <= prime)
                minMultiple = prime * 2;
            for (int multiple = minMultiple; multiple <= max ; multiple += prime) {
                rangePrime[multiple - min] = false;
            }
        }
    }

    // Sum the primes
    long sum = 0;
    for (int prime = min; prime <= max; prime++) {
        if (rangePrime[prime - min]) {
            sum += prime;
        }
    }
    return sum;
}

测试

public static void main(String[] args) {
    test(1000,3);
    test(100000000,4);
}
public static void test(int n,int threadCount) {
    long start = System.nanoTime();
    long sum = sumPrimes(n,threadCount);
    long end = System.nanoTime();
    System.out.printf("sumPrimes(%,d,%d) = %,d (%.9f seconds)%n",n,threadCount,sum,(end - start) / 1e9);
}

输出

sumPrimes(1,000,3) = 76,127 (0.005595600 seconds)
sumPrimes(100,4) = 279,209,790,387,276 (0.686881000 seconds)

更新2

上面的代码使用的是lambda表达式:

threads[t] = new Thread(() ->
    totalSum.addAndGet(sumPrimes(seedPrime,max))
);

如果您不想使用lambda表达式,例如因此它将在Java 7上运行,您可以改用匿名类:

threads[t] = new Thread() {
    @Override
    public void run() {
        totalSum.addAndGet(sumPrimes(seedPrime,max));
    }
};
,

多线程通常也意味着您想更快地完成一些工作。因此,首先值得回顾一下您的初始设计,并使其在单线程上更快。然后,这是一个目标。另外,要在不编写精确基准的情况下比较运行时间,则需要“可见”长度的运行时间。
在我的机器上,通过“设置”

int max = 1_000_000_000;
boolean sieve[] = new boolean[max];
long sum = 0; // will be 24739512092254535 at the end

您的原始代码

for(int i=2;i<max;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }

运行24-28秒。正如@Andreas帖子下方评论中所讨论的,以及稍后的内容(是的,现在我看到它被接受并且大部分讨论都已经过去了),内部循环进行了许多额外的检查(因为它始终进行一次比较,即使它实际上不会启动)。因此,外循环可以分为两部分:首先进行筛选和求和(直到max的最后一个“未知”除数,该除数不超过其平方根),然后对其余部分求和:

int maxunique=(int)Math.sqrt(max);
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }
for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

此计算机在我的计算机上运行14-16秒。收益巨大,而且还没有涉及线程。

然后出现线程,以及if(!sieve[i])的问题:在计算总和时,不得在素数小于i的内部循环超过{{1}之前,进行这种检查},因此i真正说明了它是否是素数。因为例如,如果某个线程像sieve[i]一样运行,而另一个线程同时在检查for(int i=4;i<10001;i+=2)sieve[i]=true;,则它仍将是sieve[10000],而false将是误认为素数。
第一次尝试可能是在一个线程上进行筛选(无论如何,其外循环“仅”进入10000的平方根),然后并行求和:

max

这有点整洁,所有线程(我有4个内核)检查相同数量的候选对象,并且结果更快。有时会快一秒钟,但通常会缩短一半(〜0.4 ...〜0.8秒)。因此,这确实不值得付出努力,筛分循环是此处最耗时的部分。

一个人可以决定允许多余的工作,并为筛子中遇到的每个素数数字启动一个线程,即使它不是实际的素数,也只是没有被剔除:

for(int i=2;i<=maxunique;i++)
    if(!sieve[i])
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;

int numt=4;
Thread sumt[]=new Thread[numt];
long sums[]=new long[numt];
for(int i=0;i<numt;i++) {
    long ii=i;
    Thread t=sumt[i]=new Thread(new Runnable() {
        public void run() {
            int from=(int)Math.max(ii*max/numt,2);
            int to=(int)Math.min((ii+1)*max/numt,max);
            long sum=0;
            for(int i=from;i<to;i++)
                if(!sieve[i])
                    sum+=i;
            sums[(int)ii]=sum;
        }
    });
    t.start();
}

for(int i=0;i<sumt.length;i++) {
    sumt[i].join();
    sum+=sums[i];
}

经过评论的List<Thread> threads=new ArrayList<>(); for(int i=2;i<=maxunique;i++) if(!sieve[i]) { int ii=i; Thread t=new Thread(new Runnable() { public void run() { for(int j=ii*2;j<max;j+=ii) sieve[j]=true; } }); t.start(); threads.add(t); } //System.out.println(threads.size()); for(int i=0;i<threads.size();i++) threads.get(i).join(); for(int i=maxunique+1;i<max;i++) if(!sieve[i]) sum+=i; 会(在我的机器上)告诉我们创建了3500-3700个线程(如果有人在原始循环中放入一个计数器,结果表明3401是最小的,许多质数在单线程sieve循环中遇到)。尽管过冲不会造成灾难性的影响,但线程数非常高,并且增益也不算太出色,尽管它比上一次尝试更明显:运行时间为10-11秒(当然可以降低一半)通过使用并行求和循环获得更多秒)。
当循环原来被过滤为非素数时,可以通过关闭循环来解决一些冗余工作:

println()

这实际上起到了一定的作用,对我来说,运行时间为8.6-10.1秒。

由于创建3401线程并不比创建3700线程少很多,因此限制它们的数量可能是个好主意,这是向for(int j=ii*2;j<max && !sieve[ii];j+=ii) 挥手告别的地方。尽管从技术上讲可以计算出它们的数量,但是有各种内置的基础架构可以为我们做到这一点。
Executors可以帮助将线程数限制为固定数量(Thread),或者甚至更好的是,将线程数限制为可用的CPU数量(newFixedThreadPool()):

newWorkStealingPool()

这样,它产生的结果与上一个(8.6-10.5s)相似。但是,对于较少的CPU数量(4个内核),条件交换会导致加速(取消注释ExecutorService es=Executors.newWorkStealingPool(); ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<Object>(es); int count=0; for(int i=2;i<=maxunique;i++) if(!sieve[i]) { int ii=i; count++; ecs.submit(new Callable<Object>() { public Object call() throws Exception { // if(!sieve[ii]) for(int j=ii*2;j<max /**/ && !sieve[ii] /**/;j+=ii) sieve[j]=true; return null; } }); } System.out.println(count); while(count-->0) ecs.take(); es.shutdown(); long sum=0; for(int i=2;i<max;i++) if(!sieve[i]) sum+=i; 并在if之间的循环中注释相同的条件),因为任务在提交时运行顺序,因此大多数冗余循环可以从一开始就退出,从而使重复检查浪费时间。那对我来说是8.5-9.3s,超过了直接线程尝试的最佳和最差时间。但是,如果您的CPU数量很高(我也根据/**/在超级计算节点上运行了32个可用内核),那么任务将重叠更多,并且是未分类的版本(因此总是进行检查) )将会更快。

如果您想以较小的速度提高可读性,则可以使用流并行化内部循环(也可以Runtime.availableProcessors()并行化),

Thread

这非常类似于原始的优化循环对,但对我来说仍然有9.4-10.0秒的速度。因此它比其他方法慢(约10%左右),但要简单得多。


更新:

  1. 我修复了一系列不一的错误:long sum=0; for(int i=2;i<=maxunique;i++) if(!sieve[i]) { sum+=i; int ii=i; IntStream.range(1,(max-1)/i).parallel().forEach( j -> sieve[ii+j*ii]=true); } for(int i=maxunique+1;i<max;i++) if(!sieve[i]) sum+=i; xy<maxunique s。尽管它不幸/不幸地没有影响巨大的结果,但是它在诸如xy<=maxunique这样的简单情况下确实失败了(当max=9并带有maxunique=3循环时,9仍然是质数,并且总和是26,而不是17)。嗯也修复了一些连续循环(因此,它们从xy<3开始)。

  2. 创建无数个子任务困扰着我,幸运的是发现了一个倒置的设计,我们不检查是否达到maxunique+1(即sqrt(max)),但是我们知道这样,如果我们完成了对低于某个特定maxunique的数字的筛选,那么我们可以继续检查直至limit的数字,因为在此范围内limit*limit ... {{1 }}确实是一个质数(并且我们仍然要记住,此上限受limit的限制)。这样就可以并行筛选。

基本算法,仅用于检查(单线程):

limit*limit

由于某种原因,它比原始的双循环变体要慢一些(13.8-14.5秒对13.7-14.0秒,最小/最大20个运行),但无论如何我还是对并行化感兴趣。
可能由于质数的不均匀分布,使用并行流效果不佳(我认为这只是将工作按看似相等的方式预先划分),但是基于maxunique的方法效果很好:

int limit=2;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            for(int j=i*2;j<max;j+=i)
                sieve[j]=true;
        }
    limit=upper;
} while(limit<=maxunique);

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

对于CPU数量较少的环境,这是迄今为止最快的(7.4-9.0秒,“无限线程数”为8.7-9.9秒,而其他{{1 }})。但是,一开始它运行的并行任务数量很少(Executor时,它仅启动两个并行循环,分别用于2和3),最重要的是,它们是运行时间最长的循环(最小的循环)步骤),因此,在高CPU数量的环境中,它仅比基于ExecutorService es=Executors.newWorkStealingPool(); ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<>(es); int limit=2; int count=0; do { int upper=Math.min(maxunique+1,limit*limit); for(int i=limit;i<upper;i++) if(!sieve[i]) { sum+=i; int ii=i; count++; ecs.submit(new Callable<Object>() { public Object call() throws Exception { for(int j=ii*2;j<max;j+=ii) sieve[j]=true; return null; } }); } while(count>0) { count--; ecs.take(); } limit=upper; } while(limit<=maxunique); es.shutdown(); for(int i=limit;i<max;i++) if(!sieve[i]) sum+=i; 的原始方法落后2.9-3.6秒和2.7-3.2秒,仅排在第二位。
当然,一个人可以在开始时实施单独的加速,明确地收集必要数量的质数以使可用核饱和,然后切换到这种基于Executor的方法,然后结果可能会胜过其他方法。不管核心数是多少。但是我认为我现在可以抵抗诱惑。

,

我认为您的问题是此代码:

   public void run() {
        for (int i = min; i < max; i++) {
            if (array[i]) {
                for (int j = min; j*i < array.length; j++) {
                    array[i*j] = false;
                }
                sum += i;
            }
        }
        allFinished.release();
    }

想象一下您后面的一个线程,在列表末尾附近工作。第一项不是质数,但识别它不是质数的工作尚未实现-它来自不同的线程,而该线程才刚刚开始。因此,您认为该值是质数(尚未标记为非质数)并且可以正常工作。

如果您提供一个产生不良结果的示例,那么我们可以轻松地测试该理论。

本文链接:https://www.f2er.com/3122938.html

大家都在问