У меня есть редкий массив a
(в основном нули):
unsigned char a[1000000];
и я хотел бы создать массив b
индексов к ненулевым элементам a
используя инструкции SIMD для архитектуры Intel x64 с AVX2. Я ищу советы, как сделать это эффективно. В частности, существуют ли инструкции SIMD для получения положений последовательных ненулевых элементов в регистре SIMD, расположенных последовательно?
Пять методов для вычисления индексов ненулевых:
Полу векторизованный цикл: загрузите вектор SIMD с символами, сравните с нулем и примените маску перемещения. Используйте небольшой скалярный цикл, если любой из символов отличен от нуля
(также предложено @stgatilov). Это хорошо работает для очень разреженных массивов. функция arr2ind_movmsk
в приведенном ниже коде используются инструкции BMI1
для скалярной петли.
Векторизованный цикл: процессоры Intel Haswell и новее поддерживают наборы команд BMI1 и BMI2. ИМТ2 содержит
pext
инструкция (Извлечение параллельных битов, см. Ссылку на Википедию),
который оказывается полезным здесь. Увидеть arr2ind_pext
в коде ниже.
Классический скалярный цикл с оператором if: arr2ind_if
,
Скалярная петля без ветвей: arr2ind_cmov
,
Справочная таблица: @stgatilov показывает, что можно использовать таблицу поиска вместо pdep и другого целого числа
инструкции. Это может хорошо работать, однако таблица соответствия довольно велика: она не помещается в кэш L1.
Здесь не проверено. Смотрите также обсуждение Вот.
/*
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c
example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros:
./a.out 20000 25
*/
#include <stdio.h>
#include <immintrin.h>
#include <stdint.h>
#include <omp.h>
#include <string.h>__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0, k;
__m256i msk;
m0=0;
for (i=0;i<n;i=i+32){ /* Load 32 bytes and compare with zero: */
msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256());
k=_mm256_movemask_epi8(msk);
k=~k; /* Search for nonzero bits instead of zero bits. */
while (k){
ind[m0]=i+_tzcnt_u32(k); /* Count the number of trailing zero bits in k. */
m0++;
k=_blsr_u32(k); /* Clear the lowest set bit in k. */
}
}
*m=m0;
return 0;
}__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0;
uint64_t cntr_const = 0xFEDCBA9876543210;
__m256i shft = _mm256_set_epi64x(0x04,0x00,0x04,0x00);
__m256i vmsk = _mm256_set1_epi8(0x0F);
__m256i cnst16 = _mm256_set1_epi32(16);
__m256i shf_lo = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02,
0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00);
__m256i shf_hi = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06,
0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04);
__m128i pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);
__m256i i_vec = _mm256_setzero_si256();
m0=0;
for (i=0;i<n;i=i+16){
__m128i v = _mm_load_si128((__m128i *)&a[i]); /* Load 16 bytes. */
__m128i msk = _mm_cmpeq_epi8(v,_mm_setzero_si128()); /* Generate 16x8 bit mask. */
msk = _mm_srli_epi64(msk,4); /* Pack 16x8 bit mask to 16x4 bit mask. */
msk = _mm_shuffle_epi8(msk,pshufbcnst); /* Pack 16x8 bit mask to 16x4 bit mask. */
msk = _mm_xor_si128(msk,_mm_set1_epi32(-1)); /* Invert 16x4 mask. */
uint64_t msk64 = _mm_cvtsi128_si64x(msk); /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/
int p = _mm_popcnt_u64(msk64)>>2; /* p is the number of nonzeros in 16 bytes of a. */
uint64_t cntr = _pext_u64(cntr_const,msk64); /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */
/* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers. */
__m256i cntr256 = _mm256_set1_epi64x(cntr);
cntr256 = _mm256_srlv_epi64(cntr256,shft);
cntr256 = _mm256_and_si256(cntr256,vmsk);
__m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo);
__m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi);
cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo);
cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi);
_mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo); /* Note that the stores of iteration i and i+16 may overlap. */
_mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */
m0 = m0+p;
i_vec = _mm256_add_epi32(i_vec,cnst16);
}
*m=m0;
return 0;
}__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0;
m0=0;
for (i=0;i<n;i++){
if (a[i]!=0){
ind[m0]=i;
m0=m0+1;
}
}
*m=m0;
return 0;
}__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){
int i, m0;
m0=0;
for (i=0;i<n;i++){
ind[m0]=i;
m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */
}
*m=m0;
return 0;
}__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){
int i;
for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%u\n",i,ind[i],a[ind[i]]);
printf("\n"); fflush( stdout );
return 0;
}__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){
int i; /* Compute a hash to compare the results of different methods. */
unsigned int chk=0;
for (i=0;i<m;i++){
chk=((chk<<1)|(chk>>31))^(ind[i]);
}
printf("chk = %10X\n",chk);
return 0;
}int main(int argc, char **argv){
int n, i, m;
unsigned int j, k, d;
unsigned char *a;
int *ind;
double t0,t1;
int meth, nrep;
char txt[30];
sscanf(argv[1],"%d",&n); /* Length of array a. */
n=n>>5; /* Adjust n to a multiple of 32. */
n=n<<5;
sscanf(argv[2],"%u",&d); /* The approximate fraction of nonzeros in a is: d/1024 */
printf("n=%d, d=%u\n",n,d);
a=_mm_malloc(n*sizeof(char),32);
ind=_mm_malloc(n*sizeof(int),32);
/* Generate a pseudo random array a. */
j=73659343;
for (i=0;i<n;i++){
j=j*653+1;
k=(j & 0x3FF00)>>8; /* k is a pseudo random number between 0 and 1023 */
if (k<d){
a[i] = (j&0xFE)+1; /* Set a[i] to nonzero. */
}else{
a[i] = 0;
}
}
/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u\n",i,a[i]);}} printf("\n"); */ /* Uncomment this line to print the nonzeros in a. */
char txt0[]="arr2ind_movmsk: ";
char txt1[]="arr2ind_pext: ";
char txt2[]="arr2ind_if: ";
char txt3[]="arr2ind_cmov: ";
nrep=10000; /* Repeat a function nrep times to make relatively accurate timings possible. */
/* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519 */
/* With nrep=10000: ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513 */
printf("nrep = \%d \n\n",nrep);
arr2ind_movmsk(a,n,ind,&m); /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */
for (meth=0;meth<4;meth++){
t0=omp_get_wtime();
switch (meth){
case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m); strcpy(txt,txt0); break;
case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m); strcpy(txt,txt1); break;
case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m); strcpy(txt,txt2); break;
case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m); strcpy(txt,txt3); break;
default: ;
}
t1=omp_get_wtime();
printf("method = %s ",txt);
/* print_chk(a,ind,m); */
printf(" elapsed time = %6.2f\n",t1-t0);
}
print_nonz(a, ind, 2); /* Do something with the results */
printf("density = %f %% \n\n",((double)m)/((double)n)*100); /* Actual nonzero density of array a. */
/* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros. */
return 0;
}
/*
With nrep=1000000:
./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519
With nrep=10000:
./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513
*/
Код был протестирован с размером массива n = 10016 (данные помещаются в кэш L1) и n = 1000000, с
разные ненулевые плотности около 0,5%, 5% и 50%. Для точного определения времени функции назывались 1000000
и 10000 раз соответственно.
Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500
0.53% 5.1% 50.0%
arr2ind_movmsk: 0.27 0.53 4.89
arr2ind_pext: 1.44 1.59 1.45
arr2ind_if: 5.93 8.95 33.82
arr2ind_cmov: 6.82 6.83 6.82
Time in seconds, size n=1000000, 1e4 function calls.
0.49% 5.1% 50.1%
arr2ind_movmsk: 0.57 2.03 5.37
arr2ind_pext: 1.47 1.47 1.46
arr2ind_if: 5.88 8.98 38.59
arr2ind_cmov: 6.82 6.81 6.81
В этих примерах векторизованные циклы быстрее, чем скалярные циклы.
Производительность arr2ind_movmsk
во многом зависит от плотности a
, Это только
быстрее, чем arr2ind_pext
если плотность достаточно мала. Точка безубыточности также зависит от размера массива n
,
Функция ‘arr2ind_if’ явно страдает от неудачного предсказания ветвлений при 50% ненулевой плотности.
Если вы ожидаете, что число ненулевых элементов будет очень низким (то есть намного меньше 1%), то вы можете просто проверить каждый 16-байтовый блок на ненулевое:
int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128());
if (mask != 65535) {
//store zero bits of mask with scalar code
}
Если процент полезных элементов достаточно мал, стоимость ошибочно спрогнозированных ветвей и стоимость медленного скалярного кода внутри «если» будет незначительной.
Что касается хорошего общего решения, сначала рассмотрим реализацию потокового уплотнения в SSE. Он удаляет все нулевые элементы из байтового массива (идея взята из Вот):
__m128i shuf [65536]; //must be precomputed
char cnt [65536]; //must be precomputed
int compress(const char *src, int len, char *dst) {
char *ptr = dst;
for (int i = 0; i < len; i += 16) {
__m128i reg = _mm_load_si128((__m128i*)&src[i]);
__m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
int mask = _mm_movemask_epi8(zeroMask);
__m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]);
_mm_storeu_si128((__m128i*)ptr, compressed);
ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask);
}
return ptr - dst;
}
Как вы видите, (_mm_shuffle_epi8
+ справочная таблица) может творить чудеса. Я не знаю другого способа векторизации структурно сложного кода, такого как потоковое сжатие.
Теперь единственной оставшейся проблемой с вашим запросом является то, что вы хотите получить индексы. Каждый индекс должен храниться в 4-байтовом значении, поэтому порция из 16 входных байтов может генерировать до 64 байтов выходных данных, которые не помещаются в один регистр SSE.
Один из способов справиться с этим — честно распаковать вывод в 64 байта. Итак, вы замените reg
с константой (0,1,2,3,4, …, 15) в коде, затем распакуйте регистр SSE в 4 регистра и добавьте регистр с четырьмя i
ценности. Для этого потребуется гораздо больше инструкций: 6 инструкций по распаковке, 4 добавления и 3 магазина (один уже есть). Что касается меня, это огромные накладные расходы, особенно если вы ожидаете менее 25% ненулевых элементов.
В качестве альтернативы, вы можете ограничить число ненулевых байтов, обрабатываемых итерацией одного цикла, на 4, так что для вывода всегда достаточно одного регистра.
Вот пример кода:
__m128i shufMask [65536]; //must be precomputed
char srcMove [65536]; //must be precomputed
char dstMove [65536]; //must be precomputed
int compress_ids(const char *src, int len, int *dst) {
const char *ptrSrc = src;
int *ptrDst = dst;
__m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15);
__m128i base = _mm_setzero_si128();
while (ptrSrc < src + len) {
__m128i reg = _mm_loadu_si128((__m128i*)ptrSrc);
__m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128());
int mask = _mm_movemask_epi8(zeroMask);
__m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]);
__m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128());
ids32 = _mm_add_epi32(ids32, base);
_mm_storeu_si128((__m128i*)ptrDst, ids32);
ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4);
ptrSrc += srcMove[mask]; //no alternative without LUT
base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask]));
}
return ptrDst - dst;
}
Недостатком этого подхода является то, что теперь каждая последующая итерация цикла не может начинаться до тех пор, пока ptrDst += dstMove[mask];
выполняется на предыдущей итерации. Таким образом, критический путь резко возрос. Аппаратная гиперпоточность или ее эмуляция вручную могут снять это наказание.
Итак, как вы видите, есть много вариантов этой основной идеи, и все они решают вашу проблему с разной степенью эффективности. Вы также можете уменьшить размер LUT, если он вам не нравится (опять же, за счет снижения производительности).
Этот подход не может быть полностью расширен на более широкие регистры (то есть AVX2 и AVX-512), но вы можете попытаться объединить инструкции нескольких последовательных итераций в одну инструкцию AVX2 или AVX-512, таким образом слегка увеличив пропускную способность.
Примечание: я не тестировал какой-либо код (потому что предварительный расчет LUT правильно требует заметных усилий).
Хотя набор инструкций AVX2 содержит много инструкций GATHER, но его производительность слишком низкая. И самый эффективный способ сделать это — обработать массив вручную.