optimising column-wise maximum with SIMD
I have this function where I spent significant amount of time in my code, and I would like to optimise it by vectorization-SIMD-compiler intrinsics, if possible.
It essentially finds the value and the location of the maximum over a matrix over columns, and stores them:
- val_ptr: input matrix: column-major (Fortran-style) n_rows-by-n_cols (where typically n_rows>>n_cols)
- opt_pos_ptr : int vector of length n_rows where to store the position of the maximum. On entry filled with zeros.
- max_ptr: float vector of length n_rows where to store the maximum. On entry filled with copies of the first column of val_ptr
- The function will be called in a parallel loop
- The memory region are guaranteed to be not overlapping
- Max_ptr'nin doldurulmasına gerçekten ihtiyacım yok, şu anda sadece defter tutma ve bellek ayrılmasını önlemek için kullanılıyor
- Windows 10'da MSVC, C ++ 17 kullanıyorum. Modern Intel CPU'ları çalıştırmak anlamına geliyor
Şablon türünün kayan veya çift olması gereken kod:
template <typename eT>
find_max(const int n_cols,
const int n_rows,
const eT* val_ptr,
int* opt_pos_ptr,
eT* max_ptr){
for (int col = 1; col < n_cols; ++col)
{
//Getting the pointer to the beginning of the column
const auto* value_col = val_ptr + col * n_rows;
//Looping over the rows
for (int row = 0; row < n_rows; ++row)
{
//If the value is larger than the current maximum, we replace and we store its positions
if (value_col[row] > max_ptr[row])
{
max_ptr[row] = value_col[row];
opt_pos_ptr[row] = col;
}
}
}
}
Şimdiye kadar denediğim şey:
- OpenMP paralelini iç döngüde kullanmaya çalıştım, ancak yalnızca çok büyük satırlarda, şu anki kullanımımdan biraz daha büyük bir şey getiriyor.
- İç döngüdeki if, #pragma omp simd'nin çalışmasını engelliyor ve onsuz yeniden yazamadım.
Yanıtlar
Gönderdiğiniz kod örneğine bağlı olarak, dikey bir maksimum değer hesaplamak istediğiniz gibi görünüyor, yani sizin durumunuzda "sütunlar" yataydır. C / C ++ 'da elemanların yatay dizileri (yani iki bitişik elemanın bellekteki bir elemanın mesafesine sahip olduğu durumlarda) normal olarak satırlar ve dikey (iki bitişik elemanın bellekte satır boyutu mesafesine sahip olduğu) - sütunlar olarak adlandırılır. Aşağıdaki cevabımda, satırların yatay ve sütunların dikey olduğu geleneksel terminolojiyi kullanacağım.
Ayrıca, kısalık için, olası bir matris elemanı türüne odaklanacağım - float
. Temel fikir aynıdır double
, temel fark vektör başına eleman sayısı ve _ps
/ _pd
intrinsics seçimidir. double
Sonunda bir sürüm sağlayacağım .
The idea is that you can compute vertical maximum for multiple columns in parallel using _mm_max_ps
/_mm_max_pd
. In order to also record the position of the found maximum, you can compare the previous maximum with the current elements. The result of the comparison is a mask, where the elements are all-ones where the maximum is updated. That mask can be used to select which position needs to be updated as well.
I must note that the algorithm below assumes that it is not important which max element's position is recorded, if there are multiple equal max elements in a column. Also, I assume the matrix does not contain NaN values, which would affect the comparisons. More on this later.
void find_max(const int n_cols,
const int n_rows,
const float* val_ptr,
int* opt_pos_ptr,
float* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 4 floats or 2 doubles
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128 mm_max = _mm_loadu_ps(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128 mm_value = _mm_loadu_ps(col_ptr);
// See if this value is greater than the old maximum
__m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
// Compute the maximum
mm_max = _mm_max_ps(mm_value, mm_max);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_ps(max_ptr + row, mm_max);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
The code above requires SSE4.1 because of the blending intrinsics. You can replace those with a combination of _mm_and_si128
/_ps
, _mm_andnot_si128
/_ps
and _mm_or_si128
/_ps
, in which case the requirements will be lowered to SSE2. See Intel Intrinsics Guide for more details on the particular intrinsics, including which instruction set extensions they require.
A note about NaN values. If your matrix can have NaNs, the _mm_cmplt_ps
test will always return false. As for _mm_max_ps
, it is generally not known what it will return. The maxps
instruction that the intrinsic translates to returns its second (source) operand if either of the operands is a NaN, so by arranging the operands of the instruction you can achieve either behavior. However, it is not documented which argument of the _mm_max_ps
intrinsic represents which operand of the instruction, and it is even possible that the compiler may use different association in different cases. See this answer for more details.
In order to ensure the correct behavior wrt. NaNs you could use inline assembler to force the correct order of maxps
operands. Unfortunately, that is not an option with MSVC for x86-64 target, which you said you're using, so instead you could reuse the _mm_cmplt_ps
result for a second blend like this:
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
This will suppress NaNs in the resulting max values. If you want to keep NaNs instead, you could use a second comparison to detect NaNs:
// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
You could probably further improve performance of the algorithm above if you use wider vectors (__m256
or __m512
) and unroll the outer loop by a small factor, so that at least a cache line worth of row data is loaded on every iteration of the inner loop.
Here is an example of implementation for double
. The important point to note here is that because there are only two double
elements per vector and there are still four positions per vector, we have to unroll the outer loop to process two vectors of double
at a time and then compress the two masks from comparisons with the previous maximums to blend the 32-bit positions.
void find_max(const int n_cols,
const int n_rows,
const double* val_ptr,
int* opt_pos_ptr,
double* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 2 doubles, but we want to process
// two vectors at a time.
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
__m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
__m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
// Compress the 2 masks into one
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_max2 = _mm_max_pd(mm_value2, mm_max2);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
_mm_storeu_pd(max_ptr + row + 2, mm_max2);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process 2 doubles at once
if (tail_size >= 2)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
// Compress the mask. The upper half doesn't matter.
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
// Only store the lower two positions
_mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
row += 2;
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}