оптимизация максимума по столбцам с помощью SIMD

Aug 15 2020

У меня есть эта функция, в которой я потратил значительное количество времени на свой код, и я хотел бы оптимизировать ее с помощью встроенных функций векторизации-SIMD-компилятора, если это возможно.

По сути, он находит значение и расположение максимума по матрице по столбцам и сохраняет их:

  • val_ptr: входная матрица: основной столбец (стиль Fortran) n_rows-by-n_cols (где обычно n_rows >> n_cols)
  • opt_pos_ptr: вектор int длины n_rows, где сохранить положение максимума. При входе заполняется нулями.
  • max_ptr: вектор с плавающей запятой длиной n_rows, где хранить максимум. При записи заполняется копиями первого столбца val_ptr
  • Функция будет вызываться в параллельном цикле
  • Гарантируется, что область памяти не перекрывается
  • Мне действительно не нужно заполнять max_ptr, в настоящее время он используется только для бухгалтерского учета и во избежание выделения памяти
  • Я использую MSVC, C ++ 17 в Windows 10. Предназначен для работы с современными процессорами Intel.

Код, в котором тип шаблона должен быть float или double:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

Что я пробовал до сих пор:

  • Я попытался использовать OpenMP parallel for во внутреннем цикле, но что-то принесло только очень большие строки, немного больше, чем мое текущее использование.
  • Внутренний цикл if in препятствует работе #pragma omp simd, и я не смог его переписать без него.

Ответы

3 AndreySemashev Aug 15 2020 at 21:55

Судя по опубликованному вами образцу кода, похоже, что вы хотите вычислить максимальное значение по вертикали, что означает, что в вашем случае «столбцы» горизонтальны. В C / C ++ горизонтальные последовательности элементов (т.е. где два соседних элемента имеют расстояние в один элемент в памяти) обычно называются строками, а вертикальные (где два соседних элемента имеют расстояние в размер строки в памяти) - столбцами. В своем ответе ниже я буду использовать традиционную терминологию, в которой строки горизонтальны, а столбцы вертикальны.

Также для краткости остановлюсь на одном возможном типе матричного элемента - float. Основная идея такая же double, с основным отличием в количестве элементов на вектор и выборе _ps/ _pdintrinsics. В doubleконце предоставлю версию .


Идея состоит в том, что вы можете вычислить вертикальный максимум для нескольких столбцов параллельно, используя _mm_max_ps/ _mm_max_pd. Чтобы также записать положение найденного максимума, вы можете сравнить предыдущий максимум с текущими элементами. Результатом сравнения является маска, где элементы - все единицы, где обновляется максимум. Эту маску можно использовать для выбора позиции, которую также необходимо обновить.

Я должен отметить, что приведенный ниже алгоритм предполагает, что не важно, какая максимальная позиция элемента записывается, если в столбце есть несколько равных максимальных элементов. Кроме того, я предполагаю, что матрица не содержит значений NaN, которые могут повлиять на сравнения. Подробнее об этом позже.

void find_max(const int n_cols, 
         const int n_rows, 
         const float* val_ptr,
         int* opt_pos_ptr,
         float* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 4 floats or 2 doubles
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128 mm_max = _mm_loadu_ps(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128 mm_value = _mm_loadu_ps(col_ptr);

            // See if this value is greater than the old maximum
            __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));

            // Compute the maximum
            mm_max = _mm_max_ps(mm_value, mm_max);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_ps(max_ptr + row, mm_max);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

Для приведенного выше кода требуется SSE4.1 из-за встроенных функций смешивания. Вы можете заменить их комбинацией _mm_and_si128/ _ps, _mm_andnot_si128/ _psи _mm_or_si128/ _ps, и в этом случае требования будут снижены до SSE2. См. Руководство по встроенным функциям Intel для получения дополнительных сведений о конкретных встроенных функциях, в том числе о том, какие расширения набора команд им требуются.


Замечание о значениях NaN. Если ваша матрица может иметь NaN, _mm_cmplt_psтест всегда будет возвращать false. Что касается _mm_max_ps, то вообще неизвестно, что он вернет. maxpsИнструкции , что внутренние сдвиги в доходности его второй (источник) операнд , если один из операндов является NaN, поэтому, организуя операнды команды можно достичь либо поведения. Однако не задокументировано, какой аргумент _mm_max_psвстроенной функции представляет какой операнд инструкции, и даже возможно, что компилятор может использовать разные ассоциации в разных случаях. См. Этот ответ для получения более подробной информации.

Чтобы обеспечить правильное поведение по отношению к. NaNs вы можете использовать встроенный ассемблер, чтобы установить правильный порядок maxpsоперандов. К сожалению, это не вариант с MSVC для цели x86-64, которую вы сказали, что используете, поэтому вместо этого вы можете повторно использовать _mm_cmplt_psрезультат для второй смеси, например:

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);

Это подавит NaN в результирующих максимальных значениях. Если вместо этого вы хотите сохранить NaN, вы можете использовать второе сравнение для определения NaN:

// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));

Вероятно, вы могли бы еще больше повысить производительность описанного выше алгоритма, если бы использовали более широкие векторы ( __m256или __m512) и развернули внешний цикл с небольшим коэффициентом, чтобы по крайней мере строка данных строки кеша загружалась на каждой итерации внутреннего цикла.


Вот пример реализации double. Здесь важно отметить, что, поскольку в doubleкаждом векторе есть только два элемента и все еще есть четыре позиции на вектор, мы должны развернуть внешний цикл для обработки двух векторов за doubleраз, а затем сжать две маски из сравнений с предыдущие максимумы для смешивания 32-битных позиций.

void find_max(const int n_cols, 
         const int n_rows, 
         const double* val_ptr,
         int* opt_pos_ptr,
         double* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 2 doubles, but we want to process
    // two vectors at a time.
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);
            __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
            // Compress the 2 masks into one
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);
            mm_max2 = _mm_max_pd(mm_value2, mm_max2);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        _mm_storeu_pd(max_ptr + row + 2, mm_max2);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process 2 doubles at once
    if (tail_size >= 2)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            // Compress the mask. The upper half doesn't matter.
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        // Only store the lower two positions
        _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);

        row += 2;
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}