[C#] 对图像进行垂直翻转(FlipY)的跨平台SIMD硬件加速向量算法,兼谈并行处理收益极少的原因

  • [C#] 对图像进行垂直翻转(FlipY)的跨平台SIMD硬件加速向量算法,兼谈并行处理收益极少的原因已关闭评论
  • 3 次浏览
  • A+
所属分类:.NET技术
摘要

垂直翻转的算法原理是很简单的,就是将位图的每一行重新排列——用i表示当前行的话,那么垂直翻转的规律是——

将位图进行水平翻转或垂直翻转,这些是图像处理的常用算法。本文介绍最简单的垂直翻转,便于后续文章来探讨复杂的水平翻转算法。
本文除了会给出标量算法外,还会给出向量算法。且这些算法是跨平台的,同一份源代码,能在 X86及Arm架构上运行,且均享有SIMD硬件加速。

一、标量算法

1.1 算法实现

垂直翻转的算法原理是很简单的,就是将位图的每一行重新排列——

  • 第0行放到第(height-1)行;
  • 第1行放到第(height-1-1)行;
  • 第2行放到第(height-1-2)行;

用i表示当前行的话,那么垂直翻转的规律是——

  • 第i行放到第(height-1-i)行。

由于此时是整行数据的复制,所以无需关心像素格式。算好每行的字节数后,便可以用字节复制的办法来复制一行数据了。而水平翻转需区分不同的像素格式,比垂直翻转要复杂的多,后面的文章会详细讲解水平翻转。
下面就是标量算法的源代码。

public static unsafe void ScalarDoBatch(byte* pSrc, int strideSrc, int width, int height, byte* pDst, int strideDst) {     int strideCommon = Math.Min(Math.Abs(strideSrc), Math.Abs(strideDst));     byte* pRow = pSrc;     byte* qRow = pDst + (height - 1) * (long)strideDst; // Set to last row.     for (int i = 0; i < height; i++) {         byte* p = pRow;         byte* q = qRow;         for (int j = 0; j < strideCommon; j++) {             *q++ = *p++;         }         pRow += strideSrc;         qRow -= strideDst;     } } 

为了支持任意的像素格式,可以根据 stride(跨距)来计算每一行的字节数。注意stride有可能是负数,故需要用Abs函数来计算绝对值。计算出strideCommon后,内循环便根据它,对当前行的数据进行逐个字节复制处理,直至复制完一整行。

外循环采用了“顺序读取、逆序写入”的策略。具体来说——

  • 读取是从第0行开始的,每次循环后移动到下一行。于是在上面的源代码中,pRow的初值就是 pSrc,每次循环后pRow会 加上 跨距strideSrc。
  • 写入是从最后一行开始的,每次循环后移动到前一行。于是在上面的源代码中,qRow的初值是 pDst + (height - 1) * (long)strideDst(目标位图最后一行的地址),每次循环后qRow会 减去 跨距strideDst。

1.2 基准测试代码

使用 BenchmarkDotNet 进行基准测试。
可以使用上一篇文章的公共函数,写好标量算法的基准测试代码。源代码如下。

[Benchmark(Baseline = true)] public void Scalar() {     ScalarDo(_sourceBitmapData, _destinationBitmapData, false); }  [Benchmark] public void ScalarParallel() {     ScalarDo(_sourceBitmapData, _destinationBitmapData, true); }  public static unsafe void ScalarDo(BitmapData src, BitmapData dst, bool useParallel = false) {     int width = src.Width;     int height = src.Height;     int strideSrc = src.Stride;     int strideDst = dst.Stride;     byte* pSrc = (byte*)src.Scan0.ToPointer();     byte* pDst = (byte*)dst.Scan0.ToPointer();     bool allowParallel = useParallel && (height > 16) && (Environment.ProcessorCount > 1);     if (allowParallel) {         Parallel.For(0, height, i => {             int start = i;             int len = 1;             byte* pSrc2 = pSrc + start * (long)strideSrc;             byte* pDst2 = pDst + (height - 1 - start) * (long)strideDst;             ScalarDoBatch(pSrc2, strideSrc, width, len, pDst2, strideDst);         });     } else {         ScalarDoBatch(pSrc, strideSrc, width, height, pDst, strideDst);     } } 

由于现在是图像垂直翻转,于是并行算法需按照同样的规则来计算数据的地址。即上面提到的“顺序读取、逆序写入”。
具体来说——

  • pSrc2 是源数据的指针。由于是顺序读取,故它的计算办法是 pSrc + start * (long)strideSrc.
  • pDst2 是目标数据的指针。由于是逆序写入,故它的计算办法是 pDst + (height - 1 - start) * (long)strideDst. 编写代码时有时可能会漏掉了其中的"- 1",这是一种常见错误,容易导致指针访问非法地址等问题。请大家一定要细心。

二、向量算法

2.1 算法思路

上面的标量算法是每次复制1个字节,而向量算法可以每次复制1个向量。

若一行数据的字节数,恰好是向量大小的整数倍的话,那可以直接写个简单循环来处理。非常简单。

但在实际使用中,字节数大多数时候并不是向量大小的整数倍,此时处理起来会复杂一些。

2.1.1 解决非整数倍带来的难点

最保守的办法是——先用向量算法将整数倍的数据给处理完,随后对于末尾的数据,退回为标量算法来处理。

该办法确实有效,但存在以下缺点:

  • 需要编写2种算法——向量的和标量的。会带来更多开发成本与维护成本。
  • 末尾的数据是标量处理的,向量硬件加速不起作用。

有没有办法只需写1种算法,且对末尾的数据也能向量化处理呢?

对于“一边读取、一边写入”这种情况,由于读、写的一般是不同的数据区域,故有一种办法来实现上面的期望。该办法的关键思路是——先计算好“末尾指针”(向量处理时最后一笔数据的指针地址),然后在循环内每次均检查指针地址,当发现当前指针已经越过“末尾指针”时便强制将当前指针设置为“末尾指针”,以完成剩余数据的处理。

使用这种办法,能使末尾的数据也能向量化处理。代价就是——部分末尾的数据会被重复处理。但由于现在是“一边读取、一边写入”,故重复处理并不会带来负面影响。

在向量运算的循环中,增加指针地址判断的分支代码,的确会影响性能。但是由于现在CPU都拥有强大的分支预测能力,对于“末尾指针”判断这种在最后一次循环时才生效的分支,分支预测绝大多数是成功的,从而掩盖了该分支判断的花费。

其实本系列的前面几篇文章,也是使用这一办法,使末尾的数据也能向量化处理。所以代码中有“The last block is also use vector”的注释。

(另注:在C语言中,C99标准新增了restrict关键字来标记“指针指向不同的数据区域”情况。restrict用于限定和约束指针,表示这个指针只访问这块内存的唯一方式,也就是告诉编译器,这块内存中的内容的操作都只会通过这个指针,而不会通过其他变量或者指针。C# 目前还没有这样的关键字)

2.2 算法实现

根据上面的思路,编写代码。源代码如下。

public static unsafe void UseVectorsDoBatch(byte* pSrc, int strideSrc, int width, int height, byte* pDst, int strideDst) {     int strideCommon = Math.Min(Math.Abs(strideSrc), Math.Abs(strideDst));     int vectorWidth = Vector<byte>.Count;     int maxX = strideCommon - vectorWidth;     byte* pRow = pSrc;     byte* qRow = pDst + (height - 1) * (long)strideDst; // Set to last row.     for (int i = 0; i < height; i++) {         Vector<byte>* pLast = (Vector<byte>*)(pRow + maxX);         Vector<byte>* qLast = (Vector<byte>*)(qRow + maxX);         Vector<byte>* p = (Vector<byte>*)pRow;         Vector<byte>* q = (Vector<byte>*)qRow;         for (; ; ) {             Vector<byte> data;             // Load.             data = *p;             // Store.             *q = data;             // Next.             if (p >= pLast) break;             ++p;             ++q;             if (p > pLast) p = pLast; // The last block is also use vector.             if (q > qLast) q = qLast;         }         pRow += strideSrc;         qRow -= strideDst;     } } 

由于只需要利用向量类型来做内存复制,于是可以不用调用 Vector静态类中的方法。甚至连Load、Store 方法也无需使用,因为对向量类型类型进行指针读写操作时,编译器会自动生成“未对齐加载”、“未对齐存储”的指令。

2.3 基准测试代码

随后为该算法编写基准测试代码。

[Benchmark] public void UseVectors() {     UseVectorsDo(_sourceBitmapData, _destinationBitmapData, false); }  [Benchmark] public void UseVectorsParallel() {     UseVectorsDo(_sourceBitmapData, _destinationBitmapData, true); }  public static unsafe void UseVectorsDo(BitmapData src, BitmapData dst, bool useParallel = false) {     int vectorWidth = Vector<byte>.Count;     int width = src.Width;     int height = src.Height;     if (width <= vectorWidth) {         ScalarDo(src, dst, useParallel);         return;     }     int strideSrc = src.Stride;     int strideDst = dst.Stride;     byte* pSrc = (byte*)src.Scan0.ToPointer();     byte* pDst = (byte*)dst.Scan0.ToPointer();     bool allowParallel = useParallel && (height > 16) && (Environment.ProcessorCount > 1);     if (allowParallel) {         Parallel.For(0, height, i => {             int start = i;             int len = 1;             byte* pSrc2 = pSrc + start * (long)strideSrc;             byte* pDst2 = pDst + (height - 1 - start) * (long)strideDst;             UseVectorsDoBatch(pSrc2, strideSrc, width, len, pDst2, strideDst);         });     } else {         UseVectorsDoBatch(pSrc, strideSrc, width, height, pDst, strideDst);     } } 

也是需要细心处理并行时的地址计算。

三、使用系统所提供的MemoryCopy方法进行复制

内循环只是做复制一整行数据的操作,恰好 .NET 系统提供了内存复制的方法“Buffer.MemoryCopy”。于是,利用它也可以完成任务。源代码如下。

public static unsafe void UseCopyDoBatch(byte* pSrc, int strideSrc, int width, int height, byte* pDst, int strideDst) {     int strideCommon = Math.Min(Math.Abs(strideSrc), Math.Abs(strideDst));     byte* pRow = pSrc;     byte* qRow = pDst + (height - 1) * (long)strideDst; // Set to last row.     for (int i = 0; i < height; i++) {         Buffer.MemoryCopy(pRow, qRow, strideCommon, strideCommon);         pRow += strideSrc;         qRow -= strideDst;     } } 

代码变得更简洁了。

四、对算法进行检查

可按照上一篇的办法,使用SumDifference函数来对各种算法进行检查。
且由于现在是做图像的垂直翻转运算,翻转2次后便能变为原样。于是,现在还可以对 Scalar 算法进行检查。源代码如下。

bool allowCheck = true; if (allowCheck) {     try {         TextWriter writer = Console.Out;         long totalDifference, countByteDifference;         int maxDifference;         double averageDifference;         long totalByte = Width * Height * 3;         double percentDifference;         // Baseline         ScalarDo(_sourceBitmapData, _expectedBitmapData);         // Scalar         ScalarDo(_expectedBitmapData, _destinationBitmapData);         totalDifference = SumDifference(_sourceBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);         averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;         percentDifference = 100.0 * countByteDifference / totalByte;         writer.WriteLine(string.Format("Difference of Scalar: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));         // ScalarParallel         ScalarParallel();         totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);         averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;         percentDifference = 100.0 * countByteDifference / totalByte;         writer.WriteLine(string.Format("Difference of ScalarParallel: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));         // UseVectors         UseVectors();         totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);         averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;         percentDifference = 100.0 * countByteDifference / totalByte;         writer.WriteLine(string.Format("Difference of UseVectors: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));         // UseVectorsParallel         UseVectorsParallel();         totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);         averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;         percentDifference = 100.0 * countByteDifference / totalByte;         writer.WriteLine(string.Format("Difference of UseVectorsParallel: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));         // UseCopy         UseCopy();         totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);         averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;         percentDifference = 100.0 * countByteDifference / totalByte;         writer.WriteLine(string.Format("Difference of UseCopy: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));         // UseCopyParallel         UseCopyParallel();         totalDifference = SumDifference(_expectedBitmapData, _destinationBitmapData, out countByteDifference, out maxDifference);         averageDifference = (countByteDifference > 0) ? (double)totalDifference / countByteDifference : 0;         percentDifference = 100.0 * countByteDifference / totalByte;         writer.WriteLine(string.Format("Difference of UseCopyParallel: {0}/{1}={2}, max={3}, percentDifference={4:0.000000}%", totalDifference, countByteDifference, averageDifference, maxDifference, percentDifference));     } catch (Exception ex) {         Debug.WriteLine(ex.ToString());     } } 

五、基准测试结果

5.1 X86 架构

X86架构下的基准测试结果如下。

BenchmarkDotNet v0.14.0, Windows 11 (10.0.22631.4541/23H2/2023Update/SunValley3) AMD Ryzen 7 7840H w/ Radeon 780M Graphics, 1 CPU, 16 logical and 8 physical cores .NET SDK 8.0.403   [Host]     : .NET 8.0.10 (8.0.1024.46610), X64 RyuJIT AVX-512F+CD+BW+DQ+VL+VBMI   DefaultJob : .NET 8.0.10 (8.0.1024.46610), X64 RyuJIT AVX-512F+CD+BW+DQ+VL+VBMI   | Method             | Width | Mean         | Error      | StdDev     | Ratio | RatioSD | |------------------- |------ |-------------:|-----------:|-----------:|------:|--------:| | Scalar             | 1024  |  1,077.72 us |  20.704 us |  24.647 us |  1.00 |    0.03 | | ScalarParallel     | 1024  |    177.58 us |   3.489 us |   3.263 us |  0.16 |    0.00 | | UseVectors         | 1024  |     79.40 us |   1.549 us |   2.713 us |  0.07 |    0.00 | | UseVectorsParallel | 1024  |     19.54 us |   0.373 us |   0.547 us |  0.02 |    0.00 | | UseCopy            | 1024  |     81.88 us |   1.608 us |   2.034 us |  0.08 |    0.00 | | UseCopyParallel    | 1024  |     18.28 us |   0.357 us |   0.351 us |  0.02 |    0.00 | |                    |       |              |            |            |       |         | | Scalar             | 2048  |  4,360.82 us |  52.264 us |  48.888 us |  1.00 |    0.02 | | ScalarParallel     | 2048  |    717.40 us |  13.745 us |  13.499 us |  0.16 |    0.00 | | UseVectors         | 2048  |    992.42 us |  19.805 us |  57.457 us |  0.23 |    0.01 | | UseVectorsParallel | 2048  |    409.04 us |   8.070 us |  19.022 us |  0.09 |    0.00 | | UseCopy            | 2048  |  1,002.18 us |  19.600 us |  27.476 us |  0.23 |    0.01 | | UseCopyParallel    | 2048  |    418.30 us |   6.980 us |   5.449 us |  0.10 |    0.00 | |                    |       |              |            |            |       |         | | Scalar             | 4096  | 16,913.07 us | 244.574 us | 216.808 us |  1.00 |    0.02 | | ScalarParallel     | 4096  |  3,844.09 us |  46.626 us |  43.614 us |  0.23 |    0.00 | | UseVectors         | 4096  |  4,419.30 us |  84.049 us |  78.620 us |  0.26 |    0.01 | | UseVectorsParallel | 4096  |  4,000.12 us |  44.611 us |  39.546 us |  0.24 |    0.00 | | UseCopy            | 4096  |  4,608.49 us |  33.594 us |  31.424 us |  0.27 |    0.00 | | UseCopyParallel    | 4096  |  3,960.86 us |  47.334 us |  44.276 us |  0.23 |    0.00 | 
  • Scalar: 标量算法。
  • ScalarParallel: 并发的标量算法。
  • UseVectors: 向量算法。
  • UseVectorsParallel: 并发的向量算法。
  • UseCopy: 使用“Buffer.MemoryCopy”的算法。
  • UseCopyParallel: 并发的使用“Buffer.MemoryCopy”的算法。

5.2 Arm 架构

同样的源代码可以在 Arm 架构上运行。基准测试结果如下。

BenchmarkDotNet v0.14.0, macOS Sequoia 15.0.1 (24A348) [Darwin 24.0.0] Apple M2, 1 CPU, 8 logical and 8 physical cores .NET SDK 8.0.204   [Host]     : .NET 8.0.4 (8.0.424.16909), Arm64 RyuJIT AdvSIMD [AttachedDebugger]   DefaultJob : .NET 8.0.4 (8.0.424.16909), Arm64 RyuJIT AdvSIMD   | Method             | Width | Mean         | Error      | StdDev    | Ratio | RatioSD | |------------------- |------ |-------------:|-----------:|----------:|------:|--------:| | Scalar             | 1024  |  1,227.25 us |   0.694 us |  0.649 us |  1.00 |    0.00 | | ScalarParallel     | 1024  |    261.38 us |   0.739 us |  0.617 us |  0.21 |    0.00 | | UseVectors         | 1024  |    117.96 us |   0.105 us |  0.098 us |  0.10 |    0.00 | | UseVectorsParallel | 1024  |     39.46 us |   0.297 us |  0.263 us |  0.03 |    0.00 | | UseCopy            | 1024  |     92.95 us |   0.081 us |  0.063 us |  0.08 |    0.00 | | UseCopyParallel    | 1024  |     34.90 us |   0.170 us |  0.159 us |  0.03 |    0.00 | |                    |       |              |            |           |       |         | | Scalar             | 2048  |  5,236.47 us |  69.941 us | 62.001 us |  1.00 |    0.02 | | ScalarParallel     | 2048  |    952.35 us |   3.270 us |  3.059 us |  0.18 |    0.00 | | UseVectors         | 2048  |    700.91 us |   4.339 us |  4.058 us |  0.13 |    0.00 | | UseVectorsParallel | 2048  |    254.35 us |   1.183 us |  1.107 us |  0.05 |    0.00 | | UseCopy            | 2048  |    757.75 us |  14.775 us | 25.485 us |  0.14 |    0.01 | | UseCopyParallel    | 2048  |    252.87 us |   1.078 us |  1.009 us |  0.05 |    0.00 | |                    |       |              |            |           |       |         | | Scalar             | 4096  | 20,257.16 us | 100.815 us | 84.185 us |  1.00 |    0.01 | | ScalarParallel     | 4096  |  3,728.60 us |  12.672 us | 11.233 us |  0.18 |    0.00 | | UseVectors         | 4096  |  2,788.68 us |   2.712 us |  2.404 us |  0.14 |    0.00 | | UseVectorsParallel | 4096  |  1,776.71 us |   1.510 us |  1.412 us |  0.09 |    0.00 | | UseCopy            | 4096  |  2,448.65 us |   4.232 us |  3.959 us |  0.12 |    0.00 | | UseCopyParallel    | 4096  |  1,796.17 us |   5.197 us |  4.861 us |  0.09 |    0.00 | 

5.3 .NET Framework

同样的源代码可以在 .NET Framework 上运行。基准测试结果如下。

BenchmarkDotNet v0.14.0, Windows 11 (10.0.22631.4541/23H2/2023Update/SunValley3) AMD Ryzen 7 7840H w/ Radeon 780M Graphics, 1 CPU, 16 logical and 8 physical cores   [Host]     : .NET Framework 4.8.1 (4.8.9282.0), X64 RyuJIT VectorSize=256   DefaultJob : .NET Framework 4.8.1 (4.8.9282.0), X64 RyuJIT VectorSize=256   | Method             | Width | Mean         | Error      | StdDev     | Ratio | RatioSD | Code Size | |------------------- |------ |-------------:|-----------:|-----------:|------:|--------:|----------:| | Scalar             | 1024  |  1,062.91 us |  14.426 us |  12.788 us |  1.00 |    0.02 |   2,891 B | | ScalarParallel     | 1024  |    183.82 us |   3.609 us |   4.296 us |  0.17 |    0.00 |   2,894 B | | UseVectors         | 1024  |     71.65 us |   1.420 us |   1.328 us |  0.07 |    0.00 |   3,602 B | | UseVectorsParallel | 1024  |     24.67 us |   0.471 us |   0.579 us |  0.02 |    0.00 |   3,605 B | | UseCopy            | 1024  |     82.86 us |   1.653 us |   2.262 us |  0.08 |    0.00 |   3,280 B | | UseCopyParallel    | 1024  |     24.16 us |   0.481 us |   0.659 us |  0.02 |    0.00 |   3,283 B | |                    |       |              |            |            |       |         |           | | Scalar             | 2048  |  4,344.08 us |  68.246 us |  60.498 us |  1.00 |    0.02 |   2,891 B | | ScalarParallel     | 2048  |    681.94 us |  12.532 us |  11.722 us |  0.16 |    0.00 |   2,894 B | | UseVectors         | 2048  |    981.58 us |  14.816 us |  13.134 us |  0.23 |    0.00 |   3,602 B | | UseVectorsParallel | 2048  |    429.28 us |   8.360 us |  16.106 us |  0.10 |    0.00 |   3,605 B | | UseCopy            | 2048  |    978.79 us |  15.720 us |  13.127 us |  0.23 |    0.00 |   3,280 B | | UseCopyParallel    | 2048  |    438.06 us |   8.691 us |  15.672 us |  0.10 |    0.00 |   3,283 B | |                    |       |              |            |            |       |         |           | | Scalar             | 4096  | 17,306.43 us | 343.417 us | 352.664 us |  1.00 |    0.03 |   2,891 B | | ScalarParallel     | 4096  |  3,717.65 us |  18.424 us |  17.233 us |  0.21 |    0.00 |   2,894 B | | UseVectors         | 4096  |  4,451.39 us |  84.848 us |  87.132 us |  0.26 |    0.01 |   3,602 B | | UseVectorsParallel | 4096  |  3,818.66 us |  24.223 us |  22.658 us |  0.22 |    0.00 |   3,605 B | | UseCopy            | 4096  |  4,721.90 us |  88.960 us |  83.214 us |  0.27 |    0.01 |   3,280 B | | UseCopyParallel    | 4096  |  3,820.63 us |  19.312 us |  18.065 us |  0.22 |    0.00 |   3,283 B | 

六、并行处理收益极少的原因分析

6.1 观察测试结果,产生疑问

观察测试结果,首先能发现这2点情况——

  • UseVectors与UseCopy的测试结果非常接近,且 UseVectorsParallel的测试结果非常接近。这是因为“Buffer.MemoryCopy”内部也使用了向量算法来进行优化。
  • 对于向量算法(UseVectors、UseCopy等)来说,.NET Framework.NET 8.0 的测试结果非常接近。这是因为垂直翻转仅需要做复制操作,这是非常简单的操作。于是早在 .NET Framework 时代,“Buffer.MemoryCopy”已做好了向量化优化。

随后会发现一个奇怪的地方——并行版算法(Parallel)并没有比单线程算法快多少。尤其是对于4096宽度的图片,16个线程并行处理时,仍需花费 90%左右的时间,并行处理的收益极少。

这很奇怪,因为垂直翻转是可以分解为各行去处理的,非常适合并行化。且仅需做简单的内存复制操作,没有复杂的算术运算。16个线程并行处理时,理论上仅需要 1/16 的时间。但测试结果却是 ——仍需花费 90%左右的时间。

再来看看一下Arm架构的测试结果,会发现也存在并行处理收益极少的现象。但是要稍微好一点,例如。

  • UseCopy:2,448.65 us
  • UseCopyParallel:1,796.17 us

1796.17 / 2448.65 ≈ 0.7335。即只需 73.35 % 的时间,但这也比理论值差了不少。

6.2 数据分析

在Width为 4096时,图像高度也是 4096,且使用的是 24位 图像。那么一个图像的总字节数是 4096*4096*3, 即 48MB 左右。

根据UseCopyParallel的结果,来计算一下每秒的处理速率——

  • X86: 48 / 3.96086 ≈ 12.1185 (GB/s)
  • Arm: 48 / 1.79617 ≈ 26.7235 (GB/s)

从上面的数据来看,Arm的吞吐率是相对于 X86的倍数是:26.7235/12.1185 ≈ 2.2052。

6.3 找到原因

我这台Arm计算机,CPU是“Appla M2”。查了资料,发现M2具有“100GB/s统一内存带宽”。

而我的X86电脑,用的是DDR5-5600内存条。DDR每次传输64位数据,故内存带宽为 5600 * 64 / 8 = 44800 (MB/s),即 44.8 GB/s。

我们再来计算一下倍数:100 / 44.8 ≈ 2.2321。

“2.2321”与“2.2052”非常接近。这表示现在遇到的是——内存带宽的瓶颈。

这表示在4096时,我们的算法已经基本占满了内存带宽。此时虽然CPU有10多个线程,理论上很多算力还没发挥出来,但由于内存带宽已占满了,故止步于这个值。

由于是“一读一写”,程序需要2次访问内存——

  • X86: 12.1185*2 = 24.237 (GB/s) (理论上限是 44.8)
  • Arm: 26.7235*2 = 53.447 (GB/s) (理论上限是 100)

可以看出,程序已经达到内存带宽理论上限的60%左右了。即我们的垂直翻转算法,已经是足够优秀了。

6.4 进一步优化的空间

其实程序是还可以进一步优化的,例如使用 地址对齐、预取、非时间性(NonTemporal)写入、循环展开 等手段。但这些手段比较复杂,编码难度大,更会使程序的可读性大为降低。而且这些手段是特定硬件型号敏感的,例如更换了CPU型号后,很多细节得重新调校。

更关键的是,进一步优化的空间已经非常少了,离理论上限只有40%左右差距。就算达到理论上限,性能也仅提高这 40%左右,没法翻倍。仅当硬件型号固定,且有充足理由时,此时才可考虑做进一步的优化处理。

这也给了我们一个提醒:在编写向量化算法时,为了避免遇到内存带宽瓶颈,程序应尽量少的读写内存。例如使用向量类型进行批量读取,随后尽量将数据在向量类型里处理好,最后使用向量类型进行批量写入。

因为在使用向量类型时,是在CPU内的向量寄存器(Register)中处理的。寄存器与CPU同频工作,吞吐率比内存高了几个数量级。寄存器不够用时,一般会挪到Cache(高速缓存)上进行周转,Cache的速度也比内存快很多。

附录