optimisation du maximum par colonne avec SIMD
J'ai cette fonction où j'ai passé beaucoup de temps dans mon code, et j'aimerais l'optimiser par des intrinsèques de vectorisation-SIMD-compilateur, si possible.
Il trouve essentiellement la valeur et l'emplacement du maximum sur une matrice sur des colonnes et les stocke :
- val_ptr : matrice d'entrée : colonne majeure (style Fortran) n_rows-by-n_cols (où généralement n_rows>>n_cols)
- opt_pos_ptr : vecteur int de longueur n_rows où stocker la position du maximum. Sur entrée remplie de zéros.
- max_ptr : vecteur flottant de longueur n_rows où stocker le maximum. Sur entrée remplie avec des copies de la première colonne de val_ptr
- La fonction sera appelée dans une boucle parallèle
- La région de mémoire est garantie de ne pas se chevaucher
- Je n'ai pas vraiment besoin que le max_ptr soit rempli, actuellement il est juste utilisé pour la comptabilité et pour éviter l'allocation de mémoire
- J'utilise MSVC, C++ 17 sur Windows 10. Conçu pour exécuter des processeurs Intel modernes
Le code, où le type de modèle est censé être float ou double :
template <typename eT>
find_max(const int n_cols,
const int n_rows,
const eT* val_ptr,
int* opt_pos_ptr,
eT* max_ptr){
for (int col = 1; col < n_cols; ++col)
{
//Getting the pointer to the beginning of the column
const auto* value_col = val_ptr + col * n_rows;
//Looping over the rows
for (int row = 0; row < n_rows; ++row)
{
//If the value is larger than the current maximum, we replace and we store its positions
if (value_col[row] > max_ptr[row])
{
max_ptr[row] = value_col[row];
opt_pos_ptr[row] = col;
}
}
}
}
Ce que j'ai essayé jusqu'à présent :
- J'ai essayé d'utiliser OpenMP parallèle pour la boucle interne, mais n'apporte quelque chose que sur de très grandes lignes, un peu plus grandes que mon utilisation actuelle.
- Le si dans la boucle interne empêche #pragma omp simd de fonctionner, et je n'ai pas pu le réécrire sans lui.
Réponses
Sur la base de l'exemple de code que vous avez publié, il semble que vous souhaitiez calculer une valeur maximale verticale, ce qui signifie que dans votre cas, les "colonnes" sont horizontales. En C/C++, les séquences horizontales d'éléments (c'est-à-dire où deux éléments adjacents ont une distance d'un élément en mémoire) sont normalement appelées lignes et verticales (où deux éléments adjacents ont une distance de taille de ligne en mémoire) - colonnes. Dans ma réponse ci-dessous, j'utiliserai la terminologie traditionnelle, où les lignes sont horizontales et les colonnes sont verticales.
Aussi, par souci de brièveté, je me concentrerai sur un type possible d'élément de matrice - float
. L'idée de base est la même pour double
, la principale différence étant le nombre d'éléments par vecteur et la sélection _ps
/ _pd
intrinsèques. Je fournirai une version pour double
à la fin.
L'idée est que vous pouvez calculer le maximum vertical pour plusieurs colonnes en parallèle en utilisant _mm_max_ps
/ _mm_max_pd
. Afin d'enregistrer également la position du maximum trouvé, vous pouvez comparer le maximum précédent avec les éléments actuels. Le résultat de la comparaison est un masque, où les éléments sont tous des uns où le maximum est mis à jour. Ce masque peut être utilisé pour sélectionner la position qui doit également être mise à jour.
Je dois noter que l'algorithme ci-dessous suppose qu'il n'est pas important de savoir quelle position d'élément max est enregistrée, s'il y a plusieurs éléments max égaux dans une colonne. De plus, je suppose que la matrice ne contient pas de valeurs NaN, ce qui affecterait les comparaisons. Plus à ce sujet plus tard.
void find_max(const int n_cols,
const int n_rows,
const float* val_ptr,
int* opt_pos_ptr,
float* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 4 floats or 2 doubles
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128 mm_max = _mm_loadu_ps(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128 mm_value = _mm_loadu_ps(col_ptr);
// See if this value is greater than the old maximum
__m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
// Compute the maximum
mm_max = _mm_max_ps(mm_value, mm_max);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_ps(max_ptr + row, mm_max);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
Le code ci-dessus nécessite SSE4.1 en raison des intrinsèques de mélange. Vous pouvez les remplacer par une combinaison de _mm_and_si128
/ _ps
, _mm_andnot_si128
/ _ps
et _mm_or_si128
/ _ps
, auquel cas les exigences seront abaissées à SSE2. Voir Intel Intrinsics Guide pour plus de détails sur les intrinsèques particuliers, y compris les extensions de jeu d'instructions dont ils ont besoin.
Une note sur les valeurs NaN. Si votre matrice peut avoir des NaN, le _mm_cmplt_ps
test renverra toujours faux. Quant à _mm_max_ps
, on ne sait généralement pas ce qu'il donnera. L' maxps
instruction que l'intrinsèque traduit renvoie son deuxième opérande (source) si l'un des opérandes est un NaN, donc en organisant les opérandes de l'instruction, vous pouvez obtenir l'un ou l'autre comportement. Cependant, il n'est pas documenté quel argument de l' _mm_max_ps
intrinsèque représente quel opérande de l'instruction, et il est même possible que le compilateur utilise une association différente dans différents cas. Voir cette réponse pour plus de détails.
Afin d'assurer le bon comportement wrt. NaNs, vous pouvez utiliser l'assembleur en ligne pour forcer le bon ordre des maxps
opérandes. Malheureusement, ce n'est pas une option avec MSVC pour la cible x86-64, que vous avez dit utiliser, donc à la place, vous pouvez réutiliser le _mm_cmplt_ps
résultat pour un deuxième mélange comme celui-ci :
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
Cela supprimera les NaN dans les valeurs maximales résultantes. Si vous souhaitez conserver les NaN à la place, vous pouvez utiliser une deuxième comparaison pour détecter les NaN :
// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
Vous pourriez probablement encore améliorer les performances de l'algorithme ci-dessus si vous utilisez des vecteurs plus larges ( __m256
ou __m512
) et déroulez la boucle externe d'un petit facteur, de sorte qu'au moins une ligne de cache de données de ligne soit chargée à chaque itération de la boucle interne.
Voici un exemple d'implémentation pour double
. Le point important à noter ici est que, comme il n'y a que deux double
éléments par vecteur et qu'il y a encore quatre positions par vecteur, nous devons dérouler la boucle externe pour traiter deux vecteurs double
à la fois, puis compresser les deux masques à partir des comparaisons avec le maximums précédents pour mélanger les positions 32 bits.
void find_max(const int n_cols,
const int n_rows,
const double* val_ptr,
int* opt_pos_ptr,
double* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 2 doubles, but we want to process
// two vectors at a time.
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
__m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
__m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
// Compress the 2 masks into one
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_max2 = _mm_max_pd(mm_value2, mm_max2);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
_mm_storeu_pd(max_ptr + row + 2, mm_max2);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process 2 doubles at once
if (tail_size >= 2)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
// Compress the mask. The upper half doesn't matter.
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
// Only store the lower two positions
_mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
row += 2;
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}