การเพิ่มประสิทธิภาพสูงสุดของคอลัมน์ด้วย SIMD

Aug 15 2020

ฉันมีฟังก์ชั่นนี้ที่ฉันใช้เวลาเป็นจำนวนมากในโค้ดของฉันและฉันต้องการปรับให้เหมาะสมโดย vectorization-SIMD-compiler intrinsics ถ้าเป็นไปได้

โดยพื้นฐานแล้วจะค้นหาค่าและตำแหน่งของค่าสูงสุดบนเมทริกซ์เหนือคอลัมน์และเก็บไว้:

  • val_ptr: เมทริกซ์อินพุต: คอลัมน์หลัก (สไตล์ฟอร์แทรน) n_rows-by-n_cols (โดยทั่วไปคือ n_rows >> n_cols)
  • opt_pos_ptr: เวกเตอร์ int ของความยาว n_rows ที่เก็บตำแหน่งของค่าสูงสุด ในรายการที่เต็มไปด้วยศูนย์
  • max_ptr: เวกเตอร์ลอยของความยาว n_rows ที่เก็บค่าสูงสุด ในรายการเต็มไปด้วยสำเนาของคอลัมน์แรกของ val_ptr
  • ฟังก์ชันจะถูกเรียกใช้ในลูปคู่ขนาน
  • พื้นที่หน่วยความจำรับประกันว่าจะไม่ทับซ้อนกัน
  • ฉันไม่ต้องการเติม max_ptr จริงๆตอนนี้ใช้สำหรับเก็บหนังสือและเพื่อหลีกเลี่ยงการจัดสรรหน่วยความจำ
  • ฉันใช้ MSVC, C ++ 17 บน Windows 10 หมายถึงการเรียกใช้ซีพียู Intel ที่ทันสมัย

รหัสที่ประเภทแม่แบบหมายถึงลอยหรือสองครั้ง:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

สิ่งที่ฉันพยายามจนถึงตอนนี้:

  • ฉันพยายามใช้ OpenMP แบบขนานสำหรับวงใน แต่นำบางสิ่งมาเฉพาะในแถวที่ใหญ่มากซึ่งมีขนาดใหญ่กว่าการใช้งานปัจจุบันของฉันเล็กน้อย
  • if in inner loop ป้องกันไม่ให้ #pragma omp simd ทำงานและฉันไม่สามารถเขียนซ้ำได้หากไม่มีมัน

คำตอบ

3 AndreySemashev Aug 15 2020 at 21:55

จากตัวอย่างโค้ดที่คุณโพสต์ดูเหมือนว่าคุณต้องการคำนวณค่าสูงสุดแนวตั้งซึ่งหมายความว่าในกรณีของคุณ "คอลัมน์" เป็นแนวนอน ในลำดับขององค์ประกอบแนวนอน C / C ++ (เช่นที่สององค์ประกอบที่อยู่ติดกันมีระยะห่างขององค์ประกอบหนึ่งในหน่วยความจำ) โดยปกติจะเรียกว่าแถวและแนวตั้ง (โดยที่สององค์ประกอบที่อยู่ติดกันมีระยะห่างของขนาดแถวในหน่วยความจำ) - คอลัมน์ ในคำตอบของฉันด้านล่างฉันจะใช้คำศัพท์ดั้งเดิมโดยที่แถวอยู่ในแนวนอนและคอลัมน์เป็นแนวตั้ง

นอกจากนี้สำหรับความกะทัดรัดผมจะมุ่งเน้นไปที่ความเป็นไปได้ชนิดหนึ่งขององค์ประกอบเมทริกซ์ float- แนวคิดพื้นฐานเหมือนกันdoubleโดยมีความแตกต่างหลักคือจำนวนองค์ประกอบต่อเวกเตอร์และการเลือก_ps/ อินท_pdรินนิกส์ ฉันจะจัดเตรียมเวอร์ชันไว้ให้doubleในตอนท้าย


ความคิดคือการที่คุณสามารถคำนวณสูงสุดในแนวตั้งสำหรับคอลัมน์หลายขนานโดยใช้/_mm_max_ps _mm_max_pdในการบันทึกตำแหน่งของค่าสูงสุดที่พบคุณสามารถเปรียบเทียบค่าสูงสุดก่อนหน้ากับองค์ประกอบปัจจุบันได้ ผลลัพธ์ของการเปรียบเทียบคือมาสก์ซึ่งองค์ประกอบเป็นองค์ประกอบทั้งหมดที่มีการอัปเดตสูงสุด มาสก์นั้นสามารถใช้เพื่อเลือกตำแหน่งที่ต้องการอัปเดตได้เช่นกัน

ฉันต้องทราบว่าอัลกอริทึมด้านล่างถือว่าไม่สำคัญว่าตำแหน่งใดขององค์ประกอบสูงสุดจะถูกบันทึกหากมีองค์ประกอบสูงสุดที่เท่ากันหลายรายการในคอลัมน์ นอกจากนี้ฉันถือว่าเมทริกซ์ไม่มีค่า NaN ซึ่งจะส่งผลต่อการเปรียบเทียบ เพิ่มเติมในภายหลัง

void find_max(const int n_cols, 
         const int n_rows, 
         const float* val_ptr,
         int* opt_pos_ptr,
         float* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 4 floats or 2 doubles
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128 mm_max = _mm_loadu_ps(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128 mm_value = _mm_loadu_ps(col_ptr);

            // See if this value is greater than the old maximum
            __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));

            // Compute the maximum
            mm_max = _mm_max_ps(mm_value, mm_max);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_ps(max_ptr + row, mm_max);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

รหัสด้านบนต้องการ SSE4.1 เนื่องจากการผสมผสานภายใน คุณสามารถแทนที่ด้วยชุดค่าผสมของ_mm_and_si128/ _ps, _mm_andnot_si128/ _psและ_mm_or_si128/ _psซึ่งในกรณีนี้ข้อกำหนดจะถูกลดระดับเป็น SSE2 ดูIntel Intrinsics Guideสำหรับรายละเอียดเพิ่มเติมเกี่ยวกับ Intrinsics เฉพาะรวมถึงส่วนขยายชุดคำสั่งที่ต้องการ


หมายเหตุเกี่ยวกับค่า NaN หากเมทริกซ์ของคุณสามารถมี NaN ได้การ_mm_cmplt_psทดสอบจะแสดงผลเท็จเสมอ สำหรับ_mm_max_psโดยทั่วไปแล้วไม่ทราบว่ามันจะกลับมาเป็นอย่างไรmaxpsคำสั่งที่แปลที่แท้จริงเพื่อผลตอบแทนที่สองของ (ต้นฉบับ) ถูกดำเนินการอย่างใดอย่างหนึ่งถ้าตัวถูกดำเนินการเป็นน่านดังนั้นโดยการจัดเรียงตัวถูกดำเนินการเรียนการสอนที่คุณสามารถบรรลุพฤติกรรมอย่างใดอย่างหนึ่ง อย่างไรก็ตามไม่มีการจัดทำเป็นเอกสารว่าอาร์กิวเมนต์ของอินท_mm_max_psรินซิกใดแสดงถึงตัวถูกดำเนินการของคำสั่งใดและอาจเป็นไปได้ว่าคอมไพเลอร์อาจใช้การเชื่อมโยงที่แตกต่างกันในกรณีที่แตกต่างกัน ดูคำตอบนี้สำหรับรายละเอียดเพิ่มเติม

เพื่อให้แน่ใจว่าพฤติกรรมที่ถูกต้อง wrt. NaN คุณสามารถใช้แอสเซมเบลอร์แบบอินไลน์เพื่อบังคับลำดับที่ถูกต้องของmaxpsตัวถูกดำเนินการ น่าเสียดายที่นั่นไม่ใช่ตัวเลือกสำหรับ MSVC สำหรับเป้าหมาย x86-64 ซึ่งคุณบอกว่าคุณกำลังใช้อยู่ดังนั้นคุณสามารถนำ_mm_cmplt_psผลลัพธ์กลับมาใช้ใหม่สำหรับการผสมผสานครั้งที่สองได้เช่นนี้:

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);

สิ่งนี้จะยับยั้ง NaN ในค่าสูงสุดที่เป็นผลลัพธ์ หากคุณต้องการเก็บ NaN ไว้แทนคุณสามารถใช้การเปรียบเทียบครั้งที่สองเพื่อตรวจจับ NaN:

// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));

คุณอาจปรับปรุงประสิทธิภาพของอัลกอริทึมด้านบนเพิ่มเติมได้หากคุณใช้เวกเตอร์ที่กว้างขึ้น ( __m256หรือ__m512) และคลายการวนรอบนอกด้วยปัจจัยเล็ก ๆ เพื่อให้บรรทัดแคชของข้อมูลแถวถูกโหลดในทุก ๆ การวนซ้ำของวงใน


นี่คือตัวอย่างของการใช้งานสำหรับdouble. ประเด็นสำคัญที่ควรทราบก็คือเนื่องจากมีเพียงสองdoubleองค์ประกอบต่อเวกเตอร์และยังมีสี่ตำแหน่งต่อเวกเตอร์เราจึงต้องคลายวงรอบนอกเพื่อประมวลผลเวกเตอร์สองdoubleตัวพร้อมกันจากนั้นบีบอัดทั้งสองมาสก์จากการเปรียบเทียบกับ ค่าสูงสุดก่อนหน้าเพื่อผสมผสานตำแหน่ง 32 บิต

void find_max(const int n_cols, 
         const int n_rows, 
         const double* val_ptr,
         int* opt_pos_ptr,
         double* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 2 doubles, but we want to process
    // two vectors at a time.
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);
            __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
            // Compress the 2 masks into one
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);
            mm_max2 = _mm_max_pd(mm_value2, mm_max2);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        _mm_storeu_pd(max_ptr + row + 2, mm_max2);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process 2 doubles at once
    if (tail_size >= 2)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            // Compress the mask. The upper half doesn't matter.
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        // Only store the lower two positions
        _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);

        row += 2;
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}