การเพิ่มประสิทธิภาพสูงสุดของคอลัมน์ด้วย SIMD
ฉันมีฟังก์ชั่นนี้ที่ฉันใช้เวลาเป็นจำนวนมากในโค้ดของฉันและฉันต้องการปรับให้เหมาะสมโดย vectorization-SIMD-compiler intrinsics ถ้าเป็นไปได้
โดยพื้นฐานแล้วจะค้นหาค่าและตำแหน่งของค่าสูงสุดบนเมทริกซ์เหนือคอลัมน์และเก็บไว้:
- val_ptr: เมทริกซ์อินพุต: คอลัมน์หลัก (สไตล์ฟอร์แทรน) n_rows-by-n_cols (โดยทั่วไปคือ n_rows >> n_cols)
- opt_pos_ptr: เวกเตอร์ int ของความยาว n_rows ที่เก็บตำแหน่งของค่าสูงสุด ในรายการที่เต็มไปด้วยศูนย์
- max_ptr: เวกเตอร์ลอยของความยาว n_rows ที่เก็บค่าสูงสุด ในรายการเต็มไปด้วยสำเนาของคอลัมน์แรกของ val_ptr
- ฟังก์ชันจะถูกเรียกใช้ในลูปคู่ขนาน
- พื้นที่หน่วยความจำรับประกันว่าจะไม่ทับซ้อนกัน
- ฉันไม่ต้องการเติม max_ptr จริงๆตอนนี้ใช้สำหรับเก็บหนังสือและเพื่อหลีกเลี่ยงการจัดสรรหน่วยความจำ
- ฉันใช้ MSVC, C ++ 17 บน Windows 10 หมายถึงการเรียกใช้ซีพียู Intel ที่ทันสมัย
รหัสที่ประเภทแม่แบบหมายถึงลอยหรือสองครั้ง:
template <typename eT>
find_max(const int n_cols,
const int n_rows,
const eT* val_ptr,
int* opt_pos_ptr,
eT* max_ptr){
for (int col = 1; col < n_cols; ++col)
{
//Getting the pointer to the beginning of the column
const auto* value_col = val_ptr + col * n_rows;
//Looping over the rows
for (int row = 0; row < n_rows; ++row)
{
//If the value is larger than the current maximum, we replace and we store its positions
if (value_col[row] > max_ptr[row])
{
max_ptr[row] = value_col[row];
opt_pos_ptr[row] = col;
}
}
}
}
สิ่งที่ฉันพยายามจนถึงตอนนี้:
- ฉันพยายามใช้ OpenMP แบบขนานสำหรับวงใน แต่นำบางสิ่งมาเฉพาะในแถวที่ใหญ่มากซึ่งมีขนาดใหญ่กว่าการใช้งานปัจจุบันของฉันเล็กน้อย
- if in inner loop ป้องกันไม่ให้ #pragma omp simd ทำงานและฉันไม่สามารถเขียนซ้ำได้หากไม่มีมัน
คำตอบ
จากตัวอย่างโค้ดที่คุณโพสต์ดูเหมือนว่าคุณต้องการคำนวณค่าสูงสุดแนวตั้งซึ่งหมายความว่าในกรณีของคุณ "คอลัมน์" เป็นแนวนอน ในลำดับขององค์ประกอบแนวนอน C / C ++ (เช่นที่สององค์ประกอบที่อยู่ติดกันมีระยะห่างขององค์ประกอบหนึ่งในหน่วยความจำ) โดยปกติจะเรียกว่าแถวและแนวตั้ง (โดยที่สององค์ประกอบที่อยู่ติดกันมีระยะห่างของขนาดแถวในหน่วยความจำ) - คอลัมน์ ในคำตอบของฉันด้านล่างฉันจะใช้คำศัพท์ดั้งเดิมโดยที่แถวอยู่ในแนวนอนและคอลัมน์เป็นแนวตั้ง
นอกจากนี้สำหรับความกะทัดรัดผมจะมุ่งเน้นไปที่ความเป็นไปได้ชนิดหนึ่งขององค์ประกอบเมทริกซ์ float
- แนวคิดพื้นฐานเหมือนกันdouble
โดยมีความแตกต่างหลักคือจำนวนองค์ประกอบต่อเวกเตอร์และการเลือก_ps
/ อินท_pd
รินนิกส์ ฉันจะจัดเตรียมเวอร์ชันไว้ให้double
ในตอนท้าย
ความคิดคือการที่คุณสามารถคำนวณสูงสุดในแนวตั้งสำหรับคอลัมน์หลายขนานโดยใช้/_mm_max_ps
_mm_max_pd
ในการบันทึกตำแหน่งของค่าสูงสุดที่พบคุณสามารถเปรียบเทียบค่าสูงสุดก่อนหน้ากับองค์ประกอบปัจจุบันได้ ผลลัพธ์ของการเปรียบเทียบคือมาสก์ซึ่งองค์ประกอบเป็นองค์ประกอบทั้งหมดที่มีการอัปเดตสูงสุด มาสก์นั้นสามารถใช้เพื่อเลือกตำแหน่งที่ต้องการอัปเดตได้เช่นกัน
ฉันต้องทราบว่าอัลกอริทึมด้านล่างถือว่าไม่สำคัญว่าตำแหน่งใดขององค์ประกอบสูงสุดจะถูกบันทึกหากมีองค์ประกอบสูงสุดที่เท่ากันหลายรายการในคอลัมน์ นอกจากนี้ฉันถือว่าเมทริกซ์ไม่มีค่า NaN ซึ่งจะส่งผลต่อการเปรียบเทียบ เพิ่มเติมในภายหลัง
void find_max(const int n_cols,
const int n_rows,
const float* val_ptr,
int* opt_pos_ptr,
float* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 4 floats or 2 doubles
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128 mm_max = _mm_loadu_ps(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128 mm_value = _mm_loadu_ps(col_ptr);
// See if this value is greater than the old maximum
__m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
// Compute the maximum
mm_max = _mm_max_ps(mm_value, mm_max);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_ps(max_ptr + row, mm_max);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
รหัสด้านบนต้องการ SSE4.1 เนื่องจากการผสมผสานภายใน คุณสามารถแทนที่ด้วยชุดค่าผสมของ_mm_and_si128
/ _ps
, _mm_andnot_si128
/ _ps
และ_mm_or_si128
/ _ps
ซึ่งในกรณีนี้ข้อกำหนดจะถูกลดระดับเป็น SSE2 ดูIntel Intrinsics Guideสำหรับรายละเอียดเพิ่มเติมเกี่ยวกับ Intrinsics เฉพาะรวมถึงส่วนขยายชุดคำสั่งที่ต้องการ
หมายเหตุเกี่ยวกับค่า NaN หากเมทริกซ์ของคุณสามารถมี NaN ได้การ_mm_cmplt_ps
ทดสอบจะแสดงผลเท็จเสมอ สำหรับ_mm_max_ps
โดยทั่วไปแล้วไม่ทราบว่ามันจะกลับมาเป็นอย่างไรmaxps
คำสั่งที่แปลที่แท้จริงเพื่อผลตอบแทนที่สองของ (ต้นฉบับ) ถูกดำเนินการอย่างใดอย่างหนึ่งถ้าตัวถูกดำเนินการเป็นน่านดังนั้นโดยการจัดเรียงตัวถูกดำเนินการเรียนการสอนที่คุณสามารถบรรลุพฤติกรรมอย่างใดอย่างหนึ่ง อย่างไรก็ตามไม่มีการจัดทำเป็นเอกสารว่าอาร์กิวเมนต์ของอินท_mm_max_ps
รินซิกใดแสดงถึงตัวถูกดำเนินการของคำสั่งใดและอาจเป็นไปได้ว่าคอมไพเลอร์อาจใช้การเชื่อมโยงที่แตกต่างกันในกรณีที่แตกต่างกัน ดูคำตอบนี้สำหรับรายละเอียดเพิ่มเติม
เพื่อให้แน่ใจว่าพฤติกรรมที่ถูกต้อง wrt. NaN คุณสามารถใช้แอสเซมเบลอร์แบบอินไลน์เพื่อบังคับลำดับที่ถูกต้องของmaxps
ตัวถูกดำเนินการ น่าเสียดายที่นั่นไม่ใช่ตัวเลือกสำหรับ MSVC สำหรับเป้าหมาย x86-64 ซึ่งคุณบอกว่าคุณกำลังใช้อยู่ดังนั้นคุณสามารถนำ_mm_cmplt_ps
ผลลัพธ์กลับมาใช้ใหม่สำหรับการผสมผสานครั้งที่สองได้เช่นนี้:
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
สิ่งนี้จะยับยั้ง NaN ในค่าสูงสุดที่เป็นผลลัพธ์ หากคุณต้องการเก็บ NaN ไว้แทนคุณสามารถใช้การเปรียบเทียบครั้งที่สองเพื่อตรวจจับ NaN:
// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
คุณอาจปรับปรุงประสิทธิภาพของอัลกอริทึมด้านบนเพิ่มเติมได้หากคุณใช้เวกเตอร์ที่กว้างขึ้น ( __m256
หรือ__m512
) และคลายการวนรอบนอกด้วยปัจจัยเล็ก ๆ เพื่อให้บรรทัดแคชของข้อมูลแถวถูกโหลดในทุก ๆ การวนซ้ำของวงใน
นี่คือตัวอย่างของการใช้งานสำหรับdouble
. ประเด็นสำคัญที่ควรทราบก็คือเนื่องจากมีเพียงสองdouble
องค์ประกอบต่อเวกเตอร์และยังมีสี่ตำแหน่งต่อเวกเตอร์เราจึงต้องคลายวงรอบนอกเพื่อประมวลผลเวกเตอร์สองdouble
ตัวพร้อมกันจากนั้นบีบอัดทั้งสองมาสก์จากการเปรียบเทียบกับ ค่าสูงสุดก่อนหน้าเพื่อผสมผสานตำแหน่ง 32 บิต
void find_max(const int n_cols,
const int n_rows,
const double* val_ptr,
int* opt_pos_ptr,
double* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 2 doubles, but we want to process
// two vectors at a time.
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
__m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
__m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
// Compress the 2 masks into one
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_max2 = _mm_max_pd(mm_value2, mm_max2);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
_mm_storeu_pd(max_ptr + row + 2, mm_max2);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process 2 doubles at once
if (tail_size >= 2)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
// Compress the mask. The upper half doesn't matter.
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
// Only store the lower two positions
_mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
row += 2;
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}