RcppParallel Распараллеливание вычисления расстояния: segfault

У меня есть матрица, для которой я хочу вычислить расстояние (скажем, евклидово) между яй ряд и каждый второй ряд (т. е. я хочу яСтрока матрицы парных расстояний).

#include <Rcpp.h>
#include <cmath>
#include <algorithm>
#include <RcppParallel.h>
//#include <RcppArmadillo.h>
#include <queue>

using namespace std;
using namespace Rcpp;
using namespace RcppParallel;

// [[Rcpp::export]]
double dist_fun(NumericVector row1, NumericVector row2){
double rval = 0;
for (int i = 0; i < row1.length(); i++){
rval += (row1[i] - row2[i]) * (row1[i] - row2[i]);
}
return rval;
}

// [[Rcpp::export]]
NumericVector dist_row(NumericMatrix mat, int i){
NumericVector row(mat.nrow());

NumericMatrix::Row row1 = mat.row(i - 1);
for (int j = 0; j < mat.nrow(); j++){
NumericMatrix::Row row2 = mat.row(j);

row(j) = dist_fun(row1, row2);
}
return row;
}

// [[Rcpp::depends(RcppParallel)]]
struct JsDistance: public Worker {

// input matrix to read from
const NumericMatrix mat;
int i;

// output vector to write to
NumericVector output;

// initialize from Rcpp input and output matrixes (the RMatrix class
// can be automatically converted to from the Rcpp matrix type)
JsDistance(const NumericMatrix mat, int i, NumericVector output)
: mat(mat), i(i), output(output) {}// function call operator that work for the specified range (begin/end)
void operator()(std::size_t begin, std::size_t end) {
NumericVector row1 = mat.row(i);
for (std::size_t j = begin; j < end; j++) {

NumericVector row2 = mat.row(j);
output[j] = dist_fun(row1, row2);
}
}
};

// [[Rcpp::export]]
NumericVector parallel_dist_row(NumericMatrix mat, int i) {

// allocate the matrix we will return
NumericVector output(mat.nrow());

// create the worker
JsDistance JsDistance(mat, i, output);

// call it with parallelFor
parallelFor(0, mat.nrow(), JsDistance);
return output;
}

Последовательным способом использования Rcpp является функция ‘row_dist’, как написано выше. Тем не менее, матрица, с которой я хочу работать, очень большая, поэтому я хочу распараллелить ее. Но тогда я столкнусь с ошибкой segfault, которую я не совсем понимаю, почему. Чтобы вызвать ошибку, вы можете запустить следующий код:

library(Rcpp)
library(RcppParallel)

setThreadOptions(numThreads = 20)set.seed(42)
X = matrix(rnorm(10000 * 400), 10000, 400)
sourceCpp("question.cpp")start1 = proc.time()
print(dist_row(X, 2)[1:30])
print(proc.time() - start1)

start2 = proc.time()
print(parallel_dist_row(X, 2)[1:30])
print(proc.time() - start2)

введите описание изображения здесь

Может кто-нибудь подсказать мне, что я сделал не так? Спасибо заранее за ваше время!

================================================== =====================

Редактировать:

inline double d(double a, double b){
return fabs(a - b);
}

// [[Rcpp::depends(RcppParallel)]
struct dtwDistance: public Worker {

// Input matrix to read from must be of the RMatrix<T> form
// if using Rcpp objects
const RMatrix<double> mat;
int i;

// Output vector to write to must be of the RVector<T> form
// if using Rcpp objects
RVector<double> output;

// initialize from Rcpp input and output matrixes (the RMatrix class
// can be automatically converted to from the Rcpp matrix type)
dtwDistance(const NumericMatrix mat, int i, NumericVector output)
: mat(mat), i(i - 1), output(output) {}

// Note the -1 ^^^^ to match results from prior function

// Function call operator to iterate over a specified range (begin/end)
void operator()(std::size_t begin, std::size_t end) {

RMatrix<double>::Row row1 = mat.row(i);

for (std::size_t j = begin; j < end; ++j) {

RMatrix<double>::Row row2 = mat.row(j);

size_t n = row1.length();
size_t m = row2.length();
NumericMatrix cost(n + 1, m + 1);

for (int ii = 1; ii <= n; ii++){
cost(i, 0) = numeric_limits<double>::infinity();
}

for (int jj = 1; jj <= m; jj++){
cost(0, j) = numeric_limits<double>::infinity();
}

for (int ii = 1; ii <= n; ii++){
for (int jj = 1; jj <= m; jj++){
double dist = d(row1[ii - 1], row2[jj - 1]);
cost(ii, jj) = dist + min(min(cost(ii - 1, jj), cost(ii, jj - 1)), cost(ii - 1, jj - 1));
//cout << ii << ", " << jj << ", " << cost(ii, jj) << "\n";
}
}
output[j] = cost(n, m);

}
}
};
// [[Rcpp::export]]
NumericVector parallel_dist_row_dtw(NumericMatrix mat, int i) {

// allocate the matrix we will return
//RMatrix<double> input(mat);
NumericVector y(mat.nrow());
//RVector<double> output(y);

// create the worker
dtwDistance dtwDistance(mat, i, y);

// call it with parallelFor
parallelFor(0, mat.nrow(), dtwDistance);
return y;
}

Расстояние, которое мне нужно было рассчитать, — это динамическое время деформации. Я реализовал это, как указано выше. Тем не менее, при запуске он выдаст предупреждение «дисбаланс стека». И после нескольких запусков будет сегфо. Мне интересно, в чем проблема сейчас.

Чтобы вызвать проблему, я сделал:

library(Rcpp)
library(RcppParallel)
setThreadOptions(numThreads = 4)
sourceCpp("scripts/chisq_dtw.cpp")
set.seed(42)
X = matrix(rnorm(1000), 100, 10)

parallel_dist_row_dtw(X, 1)
parallel_dist_row_dtw(X, 2)
parallel_dist_row_dtw(X, 3)
parallel_dist_row_dtw(X, 4)
parallel_dist_row_dtw(X, 5)

0

Решение

Проблема в том, что вы не используя потокобезопасную оболочку вокруг R объектов через RMatrix<T> а также RVector<T>, Эти классы важны из-за распараллеливания, выполняемого в фоновом потоке, который является областью, которая небезопасна для вызова API R или Rcpp. официальная документация подчеркивает это в Безопасные средства доступа раздел.

В частности, у нас есть:

Для обеспечения безопасного и удобного доступа к массивам, лежащим в основе векторов и матриц R, RcppParallel представляет несколько классов доступа:

RVector<T> — Wrap R векторов различных типов

RMatrix<T> — Wrap R матрицы различных типов (также включает в себя Row а также Column классы)

Чтобы создать потокобезопасный метод доступа для вектора или матрицы Rcpp, просто создайте экземпляр RVector или же RMatrix с этим.


Код Исправить

Таким образом, ваша работа может быть исправлена ​​путем переключения *Matrix в RMatrix<T> а также *Vector в RVector<T>,

struct JsDistance: public Worker {

// Input matrix to read from must be of the RMatrix<T> form
// if using Rcpp objects
const RMatrix<double> mat;
int i;

// Output vector to write to must be of the RVector<T> form
// if using Rcpp objects
RVector<double> output;

// initialize from Rcpp input and output matrixes (the RMatrix class
// can be automatically converted to from the Rcpp matrix type)
JsDistance(const NumericMatrix mat, int i, NumericVector output)
: mat(mat), i(i - 1), output(output) {}

// Note the -1 ^^^^ to match results from prior function

// Function call operator to iterate over a specified range (begin/end)
void operator()(std::size_t begin, std::size_t end) {

RMatrix<double>::Row row1 = mat.row(i);

for (std::size_t j = begin; j < end; ++j) {

RMatrix<double>::Row row2 = mat.row(j);

double rval = 0;
for (unsigned int k = 0; k < row1.length(); ++k) {
rval += (row1[k] - row2[k]) * (row1[k] - row2[k]);
}

output[j] = rval;

}
}
};

В частности, используемые здесь типы данных имеют вид RMatrix<double> даже для доступа к матрице.

Кроме того, в параллельной версии отсутствует i-1 заявление. Чтобы исправить это, я решил позаботиться об этом в конструкторе JSDistance,


Тестовое задание

set.seed(42)
X = matrix(rnorm(10000 * 400), 10000, 400)

start1 = proc.time()

print(dist_row(X, 2)[1:30])
# [1] 811.8873   0.0000 799.8153 810.1442 720.3232 730.6083 797.8441 781.8066 827.1511 834.1863 842.9392 850.2476 724.5842 673.1428 775.0994
# [16] 805.5752 804.9281 774.9770 799.7669 870.3187 815.1129 934.7581 726.1554 804.2097 758.4943 772.8931 806.6026 715.8257 847.8980 831.7555

print(proc.time() - start1)
# user  system elapsed
# 0.22    0.00    0.23

start2 = proc.time()

print(parallel_dist_row(X, 2)[1:30])
# [1] 811.8873   0.0000 799.8153 810.1442 720.3232 730.6083 797.8441 781.8066 827.1511 834.1863 842.9392 850.2476 724.5842 673.1428 775.0994
# [16] 805.5752 804.9281 774.9770 799.7669 870.3187 815.1129 934.7581 726.1554 804.2097 758.4943 772.8931 806.6026 715.8257 847.8980 831.7555

print(proc.time() - start2)
#   user  system elapsed
#   0.28    0.00    0.06

all.equal(parallel_dist_row(X, 2), dist_row(X, 2))
# [1] TRUE
3

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]