ใช้เธรด OpenMP และ std: :( ​​การทดลอง: :) simd เพื่อคำนวณชุด Mandelbrot

Aug 19 2020

ฉันต้องการใช้พล็อตเตอร์ชุด Mandelbrot แบบง่ายๆโดยใช้กระบวนทัศน์ HPC ประเภทต่างๆแสดงจุดแข็งและจุดอ่อนของพวกเขาและการนำไปใช้นั้นง่ายหรือยากเพียงใด ลองนึกถึง GPGPU (CUDA / OpenACC / OpenMP4.5), threading / OpenMP และ MPI และใช้ตัวอย่างเหล่านี้เพื่อให้โปรแกรมเมอร์ที่เพิ่งเริ่มใช้ HPC เป็นผู้ดูแลและดูว่าความเป็นไปได้คืออะไร ความชัดเจนของโค้ดมีความสำคัญมากกว่าการได้รับประสิทธิภาพสูงสุดจากฮาร์ดแวร์นั่นคือขั้นตอนที่สอง;)

เนื่องจากปัญหาเป็นเรื่องเล็กน้อยในการทำแบบขนานและซีพียูสมัยใหม่สามารถได้รับประสิทธิภาพจำนวนมากโดยใช้คำแนะนำเวกเตอร์ฉันจึงต้องการรวม OpenMP และ SIMD ด้วย น่าเสียดายที่การเพิ่ม a #pragma omp simdไม่ให้ผลลัพธ์ที่น่าพอใจและการใช้ intrinsics นั้นไม่เป็นมิตรต่อผู้ใช้หรือเป็นข้อพิสูจน์ในอนาคต หรือสวย .

Fortunately, work is being done to the C++ standard such that it should be easier to generically implement vector instructions, as mentioned in the TS: "Extensions for parallelism, version 2", specifically section 9 on data-parallel types. A WIP implementation can be found here, which is based on VC which can be found here.

Assume that I have the following class (which has been changed to make it a bit simpler)

#include <stddef.h>

using Range = std::pair<double, double>;
using Resolution = std::pair<std::size_t, std::size_t>;

class Mandelbrot
{
    double* d_iters;
    Range d_xrange;
    Range d_yrange;
    Resolution d_res;
    std::size_t d_maxIter;
    
public:
    Mandelbrot(Range xrange, Range yrange, Resolution res, std::size_t maxIter);
    ~Mandelbrot();

    void writeImage(std::string const& fileName);
    void computeMandelbrot();
private:
    void calculateColors();
}; 

And the following implementation of computeMandelbrot() using OpenMP

void Mandelbrot::computeMandelbrot()
{
    double dx = (d_xrange.second - d_xrange.first) / d_res.first;
    double dy = (d_yrange.second - d_yrange.first) / d_res.second;

    #pragma omp parallel for schedule(dynamic)
    for (std::size_t row = 0; row != d_res.second; ++row)
    {
        double c_imag = d_yrange.first + row * dy;
        for (std::size_t col = 0; col != d_res.first; ++col)
        {
            double real = 0.0;
            double imag = 0.0;
            double realSquared = 0.0;
            double imagSquared = 0.0;
            double c_real = d_xrange.first + col * dx;

            std::size_t iter = 0;
            while (iter < d_maxIter && realSquared + imagSquared < 4.0)
            {
                realSquared = real * real;
                imagSquared = imag * imag;
                imag = 2 * real * imag + c_imag;
                real = realSquared - imagSquared + c_real;
                ++iter;
            }
            d_iters[row * d_res.first + col] = iter;
        }   
    }
}

We can assume that the resolutions both x and y directions are multiples of 2/4/8/.., depending on which SIMD instructions we use.

Unfortunately, there is very little information available online on std::experimental::simd. Nor any non-trivial examples as far as I could find.

In the Vc git repository, there is an implementation of the Mandelbrot set calculator, but it's quite convoluted and due to the lack of comments rather difficult to follow.

It is clear that I should change the data types of the doubles in the function computeMandelbrot(), but I'm unsure to what. The TS mentions two main new data types for some type T,

native_simd = std::experimental::simd<T, std::experimental::simd_abi::native>;

and

fixed_size_simd = std::experimental::simd<T, std::experimental::simd_abi::fixed_size<N>>;

Using native_simd makes the most sense, since I don't know my bounds at compile time. But then it is not clear to me what these types represent, is a native_simd<double> a single double or is it a collection of doubles on which a vector instruction is executed? And then how many doubles are in this collection?

If somebody could point me to examples where these concepts are used, or give me some pointers on how to implement vector instructions using std::experimental::simd, I would be very grateful.

คำตอบ

NigelOvermars Aug 24 2020 at 19:45

Here is a very basic implementation, which works (as far as I can tell). The testing which elements of the vector have absolute value larger than 2 is done in a very cumbersome and inefficient way. There must be a better way to do this, but I haven't found it yet.

I get about a 72% performance increase on a AMD Ryzen 5 3600 and giving g++ the option -march=znver2, which is less than expected.

template <class T>
void mandelbrot(T xstart, T xend,
                T ystart, T yend)
{
    namespace stdx = std::experimental;

    constexpr auto simdSize = stdx::native_simd<T>().size();
    constexpr unsigned size = 4096;
    constexpr unsigned maxIter = 250;

    assert(size % simdSize == 0);
    unsigned* res = new unsigned[size * size];

    T dx = (xend - xstart) / size;
    T dy = (yend - ystart) / size;

    for (std::size_t row = 0; row != size; ++row)
    {
        T c_imag = ystart + row * dy;
        for (std::size_t col = 0; col != size; col += simdSize)
        {
            stdx::native_simd<T> real{0};
            stdx::native_simd<T> imag{0};
            stdx::native_simd<T> realSquared{0};
            stdx::native_simd<T> imagSquared{0};
            stdx::fixed_size_simd<unsigned, simdSize> iters{0};

            stdx::native_simd<T> c_real;
            for (int idx = 0; idx != simdSize; ++idx)
            {
                c_real[idx] = xstart + (col + idx) * dx;
            }

            for (unsigned iter = 0; iter != maxIter; ++iter)
            {
                realSquared = real * real;
                imagSquared = imag * imag;
                auto isInside = realSquared + imagSquared > stdx::native_simd<T>{4};
                for (int idx = 0; idx != simdSize; ++idx)
                {
                    // if not bigger than 4, increase iters
                    if (!isInside[idx])
                    {
                        iters[idx] += 1;
                    }
                    else
                    {
                        // prevent that they become inf/nan
                        real[idx] = static_cast<T>(4);
                        imag[idx] = static_cast<T>(4);
                    }
                }

                if (stdx::all_of(isInside) )
                {
                    break;
                }        

                imag = static_cast<T>(2.0) * real * imag + c_imag;
                real = realSquared - imagSquared + c_real;
            }
            iters.copy_to(res + row * size + col, stdx::element_aligned);
        }

    }
    delete[] res;
}

The whole testing code (starting from auto test = (...)) compiles down to

  .L9:
  vmulps ymm1, ymm1, ymm1
  vmulps ymm13, ymm2, ymm2
  xor eax, eax
  vaddps ymm2, ymm13, ymm1
  vcmpltps ymm2, ymm5, ymm2
  vmovaps YMMWORD PTR [rsp+160], ymm2
  jmp .L6
.L3:
  vmovss DWORD PTR [rsp+32+rax], xmm0
  vmovss DWORD PTR [rsp+64+rax], xmm0
  add rax, 4
  cmp rax, 32
  je .L22
.L6:
  vucomiss xmm3, DWORD PTR [rsp+160+rax]
  jp .L3
  jne .L3
  inc DWORD PTR [rsp+96+rax]
  add rax, 4
  cmp rax, 32
  jne .L6