Optimieren des spaltenweisen Maximums mit SIMD
Ich habe diese Funktion, bei der ich viel Zeit in meinem Code verbracht habe, und ich möchte sie nach Möglichkeit durch Vektorisierungs-SIMD-Compiler-Intrinsik optimieren.
Es findet im Wesentlichen den Wert und die Position des Maximums über einer Matrix über Spalten und speichert sie:
- val_ptr: Eingabematrix: Hauptspalte (Fortran-Stil) n_rows-by-n_cols (wobei typischerweise n_rows>>n_cols)
- opt_pos_ptr : int Vektor der Länge n_rows wo die Position des Maximums gespeichert werden soll. Bei der Eingabe mit Nullen gefüllt.
- max_ptr: Float-Vektor der Länge n_rows, wo das Maximum gespeichert werden soll. Beim Eintrag gefüllt mit Kopien der ersten Spalte von val_ptr
- Die Funktion wird in einer parallelen Schleife aufgerufen
- Es ist garantiert, dass sich die Speicherbereiche nicht überlappen
- Ich brauche das max_ptr nicht wirklich, um es zu füllen, derzeit wird es nur für die Buchhaltung verwendet und um die Speicherzuweisung zu vermeiden
- Ich verwende MSVC, C ++ 17 unter Windows 10. Soll moderne Intel-CPUs ausführen
Der Code, bei dem der Vorlagentyp Float oder Double sein soll:
template <typename eT>
find_max(const int n_cols,
const int n_rows,
const eT* val_ptr,
int* opt_pos_ptr,
eT* max_ptr){
for (int col = 1; col < n_cols; ++col)
{
//Getting the pointer to the beginning of the column
const auto* value_col = val_ptr + col * n_rows;
//Looping over the rows
for (int row = 0; row < n_rows; ++row)
{
//If the value is larger than the current maximum, we replace and we store its positions
if (value_col[row] > max_ptr[row])
{
max_ptr[row] = value_col[row];
opt_pos_ptr[row] = col;
}
}
}
}
Was ich bisher versucht habe:
- Ich habe versucht, OpenMP parallel für die innere Schleife zu verwenden, bringt aber etwas nur bei sehr großen Zeilen, die etwas größer sind als meine derzeitige Verwendung.
- Die if in der inneren Schleife verhindert, dass #pragma omp simd funktioniert, und ich konnte es ohne sie nicht neu schreiben.
Antworten
Basierend auf dem von Ihnen geposteten Codebeispiel sieht es so aus, als ob Sie einen vertikalen Maximalwert berechnen möchten, was bedeutet, dass in Ihrem Fall "Spalten" horizontal sind. In C/C++ werden horizontale Folgen von Elementen (dh wo zwei benachbarte Elemente einen Abstand von einem Element im Speicher haben) normalerweise Zeilen genannt und vertikale (wo zwei benachbarte Elemente einen Abstand von der Zeilengröße im Speicher haben) - Spalten. In meiner Antwort unten werde ich die traditionelle Terminologie verwenden, bei der Zeilen horizontal und Spalten vertikal sind.
Außerdem werde ich mich der Kürze halber auf einen möglichen Typ des Matrixelements konzentrieren – float
. Die Grundidee ist die gleiche wie bei double
, wobei der Hauptunterschied in der Anzahl der Elemente pro Vektor und der _ps
/ _pd
-Intrinsic-Auswahl liegt. Ich werde eine Version für double
am Ende bereitstellen.
_mm_max_ps
Die Idee ist, dass Sie mit / das vertikale Maximum für mehrere Spalten parallel berechnen können _mm_max_pd
. Um auch die Position des gefundenen Maximums festzuhalten, können Sie das vorherige Maximum mit den aktuellen Elementen vergleichen. Das Ergebnis des Vergleichs ist eine Maske, wo die Elemente alle Einsen sind, wo das Maximum aktualisiert wird. Diese Maske kann verwendet werden, um auszuwählen, welche Position ebenfalls aktualisiert werden muss.
Ich muss beachten, dass der folgende Algorithmus davon ausgeht, dass es nicht wichtig ist, welche Position des maximalen Elements aufgezeichnet wird, wenn es mehrere gleiche maximale Elemente in einer Spalte gibt. Außerdem gehe ich davon aus, dass die Matrix keine NaN-Werte enthält, was sich auf die Vergleiche auswirken würde. Dazu später mehr.
void find_max(const int n_cols,
const int n_rows,
const float* val_ptr,
int* opt_pos_ptr,
float* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 4 floats or 2 doubles
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128 mm_max = _mm_loadu_ps(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128 mm_value = _mm_loadu_ps(col_ptr);
// See if this value is greater than the old maximum
__m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
// Compute the maximum
mm_max = _mm_max_ps(mm_value, mm_max);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_ps(max_ptr + row, mm_max);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
Der obige Code erfordert SSE4.1 aufgrund der Blending-Intrinsics. Sie können diese durch eine Kombination aus _mm_and_si128
/ _ps
, _mm_andnot_si128
/ _ps
und _mm_or_si128
/ ersetzen _ps
, in diesem Fall werden die Anforderungen auf SSE2 gesenkt. Im Intel Intrinsics Guide finden Sie weitere Einzelheiten zu den jeweiligen Intrinsics, einschließlich der erforderlichen Befehlssatzerweiterungen.
Eine Anmerkung zu den NaN-Werten. Wenn Ihre Matrix NaNs haben kann, gibt der _mm_cmplt_ps
Test immer falsch zurück. Was _mm_max_ps
es betrifft, ist im Allgemeinen nicht bekannt, was es zurückgeben wird. Die maxps
Anweisung, in die das Intrinsic übersetzt wird, gibt ihren zweiten (Quellen-)Operanden zurück, wenn einer der Operanden ein NaN ist, sodass Sie durch Anordnen der Operanden der Anweisung beide Verhaltensweisen erreichen können. Es ist jedoch nicht dokumentiert, welches Argument des _mm_max_ps
Intrinsic welchen Operanden der Anweisung darstellt, und es ist sogar möglich, dass der Compiler in verschiedenen Fällen unterschiedliche Assoziationen verwendet. Weitere Informationen finden Sie in dieser Antwort.
Um das korrekte Verhalten bzgl. NaNs könnten Sie Inline-Assembler verwenden, um die richtige Reihenfolge der maxps
Operanden zu erzwingen. Leider ist dies keine Option mit MSVC für x86-64-Ziel, von dem Sie sagten, dass Sie es verwenden. Stattdessen könnten Sie das _mm_cmplt_ps
Ergebnis für eine zweite Mischung wie diese wiederverwenden:
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
Dadurch werden NaNs in den resultierenden Maximalwerten unterdrückt. Wenn Sie stattdessen NaNs behalten möchten, können Sie einen zweiten Vergleich verwenden, um NaNs zu erkennen:
// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
Sie könnten die Leistung des obigen Algorithmus wahrscheinlich weiter verbessern, wenn Sie breitere Vektoren ( __m256
oder __m512
) verwenden und die äußere Schleife um einen kleinen Faktor entrollen, sodass bei jeder Iteration der inneren Schleife mindestens eine Cache-Zeile mit Zeilendaten geladen wird.
Hier ist ein Implementierungsbeispiel für double
. double
Der wichtige Punkt, der hier zu beachten ist, ist, dass wir , da es nur zwei Elemente pro Vektor und immer noch vier Positionen pro Vektor gibt, die äußere Schleife entrollen müssen, um zwei Vektoren double
gleichzeitig zu verarbeiten, und dann die beiden Masken aus Vergleichen mit komprimieren müssen vorherigen Maxima, um die 32-Bit-Positionen zu mischen.
void find_max(const int n_cols,
const int n_rows,
const double* val_ptr,
int* opt_pos_ptr,
double* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 2 doubles, but we want to process
// two vectors at a time.
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
__m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
__m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
// Compress the 2 masks into one
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_max2 = _mm_max_pd(mm_value2, mm_max2);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
_mm_storeu_pd(max_ptr + row + 2, mm_max2);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process 2 doubles at once
if (tail_size >= 2)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
// Compress the mask. The upper half doesn't matter.
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
// Only store the lower two positions
_mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
row += 2;
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}