ottimizzazione del massimo per colonna con SIMD

Aug 15 2020

Ho questa funzione in cui ho trascorso una notevole quantità di tempo nel mio codice e vorrei ottimizzarlo con le intrinseche del compilatore di vettorizzazione-SIMD, se possibile.

Essenzialmente trova il valore e la posizione del massimo su una matrice su colonne e li memorizza:

  • val_ptr: matrice di input: column-major (stile Fortran) n_rows-by-n_cols (dove tipicamente n_rows>>n_cols)
  • opt_pos_ptr : int vettore di lunghezza n_righe dove memorizzare la posizione del massimo. In entrata riempito con zeri.
  • max_ptr: vettore float di lunghezza n_righe dove memorizzare il massimo. In entrata riempito con copie della prima colonna di val_ptr
  • La funzione verrà chiamata in un ciclo parallelo
  • La regione di memoria è garantita per non sovrapporsi
  • Non ho davvero bisogno che max_ptr sia riempito, attualmente è usato solo per la contabilità e per evitare l'allocazione di memoria
  • Uso MSVC, C++ 17 su Windows 10. Pensato per eseguire moderne CPU Intel

Il codice, in cui il tipo di modello deve essere float o double:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

Cosa ho provato finora:

  • Ho provato a utilizzare OpenMP parallelo per il ciclo interno, ma porta qualcosa solo su righe molto grandi, leggermente più grandi del mio utilizzo attuale.
  • Il ciclo if in inner impedisce a #pragma omp simd di funzionare e non sono stato in grado di riscriverlo senza di esso.

Risposte

3 AndreySemashev Aug 15 2020 at 21:55

In base all'esempio di codice che hai pubblicato, sembra che tu voglia calcolare un valore massimo verticale, il che significa che nel tuo caso le "colonne" sono orizzontali. In C/C++ le sequenze orizzontali di elementi (cioè dove due elementi adiacenti hanno la distanza di un elemento in memoria) sono normalmente chiamate righe e verticali (dove due elementi adiacenti hanno la distanza della dimensione della riga in memoria) - colonne. Nella mia risposta qui sotto userò la terminologia tradizionale, dove le righe sono orizzontali e le colonne sono verticali.

Inoltre, per brevità, mi concentrerò su un possibile tipo di elemento matrice: float. L'idea di base è la stessa per double, con la differenza principale che è il numero di elementi per vettore e la selezione di _ps/ _pdintrinseci. Fornirò una versione per doublela fine.


L'idea è che puoi calcolare il massimo verticale per più colonne in parallelo usando _mm_max_ps/ _mm_max_pd. Per registrare anche la posizione del massimo trovato, è possibile confrontare il massimo precedente con gli elementi attuali. Il risultato del confronto è una maschera, dove gli elementi sono tutti uno dove il massimo è aggiornato. Tale maschera può essere utilizzata anche per selezionare quale posizione deve essere aggiornata.

Devo notare che l'algoritmo seguente presuppone che non sia importante quale posizione dell'elemento massimo sia registrata, se ci sono più elementi massimi uguali in una colonna. Inoltre, presumo che la matrice non contenga valori NaN, il che influenzerebbe i confronti. Ne parleremo più avanti.

void find_max(const int n_cols, 
         const int n_rows, 
         const float* val_ptr,
         int* opt_pos_ptr,
         float* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 4 floats or 2 doubles
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128 mm_max = _mm_loadu_ps(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128 mm_value = _mm_loadu_ps(col_ptr);

            // See if this value is greater than the old maximum
            __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));

            // Compute the maximum
            mm_max = _mm_max_ps(mm_value, mm_max);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_ps(max_ptr + row, mm_max);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

Il codice precedente richiede SSE4.1 a causa delle caratteristiche intrinseche di fusione. Puoi sostituirli con una combinazione di _mm_and_si128/ _ps, _mm_andnot_si128/ _pse _mm_or_si128/ _ps, nel qual caso i requisiti verranno abbassati a SSE2. Consultare Intel Intrinsics Guide per ulteriori dettagli sugli intrinseci specifici, incluse le estensioni del set di istruzioni necessarie.


Una nota sui valori NaN. Se la tua matrice può avere NaN, il _mm_cmplt_pstest restituirà sempre falso. Per quanto riguarda _mm_max_ps, generalmente non si sa cosa restituirà. L' maxpsistruzione in cui l'intrinseco traduce restituisce il suo secondo operando (sorgente) se uno degli operandi è un NaN, quindi disponendo gli operandi dell'istruzione è possibile ottenere entrambi i comportamenti. Tuttavia, non è documentato quale argomento dell'intrinseco _mm_max_psrappresenta quale operando dell'istruzione, ed è anche possibile che il compilatore possa utilizzare un'associazione diversa in casi diversi. Vedi questa risposta per maggiori dettagli.

Al fine di garantire il corretto comportamento wrt. NaNs è possibile utilizzare l'assembler inline per forzare l'ordine corretto degli maxpsoperandi. Sfortunatamente, questa non è un'opzione con MSVC per il target x86-64, che hai detto di utilizzare, quindi potresti riutilizzare il _mm_cmplt_psrisultato per una seconda fusione come questa:

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);

Ciò eliminerà i NaN nei valori massimi risultanti. Se invece vuoi mantenere i NaN, puoi utilizzare un secondo confronto per rilevare i NaN:

// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));

Probabilmente potresti migliorare ulteriormente le prestazioni dell'algoritmo sopra se usi vettori più ampi ( __m256o __m512) e srotoli il ciclo esterno di un piccolo fattore, in modo che almeno una riga della cache di dati di riga venga caricata su ogni iterazione del ciclo interno.


Ecco un esempio di implementazione per double. Il punto importante da notare qui è che poiché ci sono solo due doubleelementi per vettore e ci sono ancora quattro posizioni per vettore, dobbiamo srotolare il ciclo esterno per elaborare due vettori di doublealla volta e quindi comprimere le due maschere dai confronti con il massimi precedenti per unire le posizioni a 32 bit.

void find_max(const int n_cols, 
         const int n_rows, 
         const double* val_ptr,
         int* opt_pos_ptr,
         double* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 2 doubles, but we want to process
    // two vectors at a time.
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);
            __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
            // Compress the 2 masks into one
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);
            mm_max2 = _mm_max_pd(mm_value2, mm_max2);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        _mm_storeu_pd(max_ptr + row + 2, mm_max2);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process 2 doubles at once
    if (tail_size >= 2)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            // Compress the mask. The upper half doesn't matter.
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        // Only store the lower two positions
        _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);

        row += 2;
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}