执行矢量c的交集

前端之家收集整理的这篇文章主要介绍了执行矢量c的交集前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有200个存储在vecOfVec中的大小为1到4000000的向量.我需要将这些向量与大小为9000个元素的单个向量“vecSearched”相交.我尝试使用以下代码执行相同操作,但是使用perf工具我发现我正在做的交叉点是我的代码中的瓶颈.我是否有某种方式可以执行有效的交叉路口
  1. #include <cstdlib>
  2. #include <iostream>
  3. #include <vector>
  4.  
  5. using namespace std;
  6.  
  7. int main(int argc,char** argv) {
  8. vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
  9. vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
  10. for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++)
  11. {
  12. //first find first 9 values spaced at equi-distant places,use these 9 values for performing comparisons
  13. vector<unsigned> equiSpacedVec;
  14.  
  15. if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue
  16. {
  17. continue;
  18. }
  19.  
  20. unsigned elementIndex=0; //used for iterating over equiSpacedVec
  21. unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second
  22. //search for value in bucket and store it in bucketValPos
  23. bool firstRun=true;
  24. for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos)
  25. {
  26. //construct a summarized vector out of individual vectors of vecOfVec
  27. if(firstRun)
  28. {
  29. firstRun=false;
  30. unsigned elementIndex1=0; //used for iterating over equiSpacedVec
  31. while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors
  32. {
  33. if((elementIndex1+(10000))<(vecOfVec[kbt].size()))
  34. elementIndex1+=10000;
  35. else
  36. break;
  37. equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]);
  38. }
  39. }
  40. //skip individual vectors of vecOfVec using summarized vector constructed above
  41. while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){
  42. elementIndex+=1;
  43. if((i+100)<(vecOfVec[kbt].size()))
  44. i+=100;
  45. }
  46. unsigned j=i;
  47. while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){
  48. j++;
  49. }
  50. if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position.
  51. {
  52. break;
  53. }
  54.  
  55. if((*itValPos)==vecOfVec[kbt][j])
  56. {
  57. //store intersection result
  58. }
  59. }
  60. }
  61. return 0;
  62. }

解决方法

你的问题非常受欢迎.由于没有关联向量的数据,因此可以归结为加速两个向量之间的交集,基本上有两种方法

1.没有任何预处理

这通常由三件事来解决

>重复比较次数.例如,对于小向量(大小为1到50),您应该二元搜索每个元素以避免遍历主题向量的所有9000个元素.
>提高代码质量以减少分支错误预测,例如观察结果集通常比输入集更小的大小可以转换这样的代码

  1. while (Apos < Aend && Bpos < Bend) {
  2. if (A[Apos] == B[Bpos]) {
  3. C[Cpos++] = A[Apos];
  4. Apos++; Bpos++;
  5. }
  6. else if (A[Apos] > B[Bpos]) {
  7. Bpos++;
  8. }
  9. else {
  10. Apos++;
  11. }
  12. }

代码“展开”这样的比较创建虽然更容易预测分支(例如块大小= 2):

  1. while (1) {
  2. Adat0 = A[Apos]; Adat1 = A[Apos + 1];
  3. Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1];
  4. if (Adat0 == Bdat0) {
  5. C[Cpos++] = Adat0;
  6. }
  7. else if (Adat0 == Bdat1) {
  8. C[Cpos++] = Adat0;
  9. goto advanceB;
  10. }
  11. else if (Adat1 == Bdat0) {
  12. C[Cpos++] = Adat1;
  13. goto advanceA;
  14. }
  15. if (Adat1 == Bdat1) {
  16. C[Cpos++] = Adat1;
  17. goto advanceAB;
  18. }
  19. else if (Adat1 > Bdat1) goto advanceB;
  20. else goto advanceA;
  21. advanceA:
  22. Apos+=2;
  23. if (Apos >= Aend) { break; } else { continue; }
  24. advanceB:
  25. Bpos+=2;
  26. if (Bpos >= Bend) { break; } else { continue; }
  27. advanceAB:
  28. Apos+=2; Bpos+=2;
  29. if (Apos >= Aend || Bpos >= Bend) { break; }
  30. }
  31. // fall back to naive algorithm for remaining elements

>使用SIMD指令执行块操作

这些技术很难在质量保证环境中描述,但您可以阅读它们(以及相关的优化,如if转换)herehere或查找实现元素here

2.进行预处理

这个恕我直言是一个更好的方式,因为你有一个大小为9000元素的主题向量.您可以从中创建一个间隔树,或者只是找到一种索引它的方法,例如,创建一个可以加快搜索速度的结构:

  1. vector<unsigned> subject(9000+);
  2. vector<range> index(9000/M);

范围是一个类似的结构

  1. struct range {
  2. unsigned min,max;
  3. };

从而创建一系列范围

  1. [0,100],[101,857],... [33221,33500]

这将允许在进行交集时跳过许多比较(例如,如果另一组的元素大于子范围的最大值,则可以完全跳过该子范围)

3.并行化

是的,在两个列表中总是有第三个元素:P.如果您已经对程序进行了足够的优化(并且只有那时),请将您的工作分解为块并并行运行.这个问题符合一个令人尴尬的模式,所以200个向量对比1应该定义为“50对1并发4次”

测试,测量,重新设计!!

猜你在找的C&C++相关文章