По сути, в мое приложение C ++ поступают данные, ориентированные на строки / потоковые данные (Netflow), и я хочу записать данные в файлы Parquet-gzip.
Глядя на пример программы reader-writer.cc в проекте parquet-cpp кажется, что я могу передавать данные только в parquet-cpp столбчатым способом:
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
...
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
// Write the Bool column
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
...
// Write the ... column
Кажется, это подразумевает, что мне нужно будет самостоятельно буферизовать строки NUM_ROWS_PER_ROW_GROUP, а затем перебрать их, чтобы передавать их в parquet-cpp по одному столбцу за раз. Я надеюсь, что есть лучший способ, так как он кажется неэффективным, поскольку данные нужно будет копировать дважды: один раз в мои буферы, затем снова при подаче данных в parquet-cpp по одному столбцу за раз.
Есть ли способ получить данные каждой строки в parquet-cpp без предварительной буферизации группы строк? Проект Apache Arrow (который использует parquet-cpp) имеет учебник, который показывает, как преобразовать построчные данные в таблицу со стрелками. Для каждой строки входных данных код добавляется к каждому построителю столбцов:
for (const data_row& row : rows) {
ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost));
Я хотел бы сделать что-то подобное с parquet-cpp. Это возможно?
У вас никогда не будет вообще никакой буферизации, поскольку нам нужно преобразовать построчное представление в столбчатое. Наилучший путь на момент написания статьи — это создание таблиц Apache Arrow, которые затем подаются в parquet-cpp
,
parquet-cpp
предоставляет специальные API-интерфейсы Arrow, которые затем могут напрямую работать с этими таблицами, в основном без каких-либо дополнительных копий данных. Вы можете найти API в parquet/arrow/reader.h
а также parquet/arrow/writer.h
,
Оптимальное, но еще не реализованное решение может сэкономить несколько байтов, выполнив следующие действия:
Несмотря на то, что это оптимальное решение может сэкономить вам немного памяти, есть некоторые шаги, которые кто-то должен реализовать (не стесняйтесь вносить их или попросить помощи в их реализации), вы, вероятно, хорошо справляетесь с API на основе Apache Arrow.
Я последовал за совет @ xhochy использовать API-интерфейсы Arrow для заполнения таблицы Arrow при поступлении данных, а затем выписать таблицу, используя parquet-cpp
«s WriteTable()
метод. Я установил GZIP как сжатие по умолчанию, но указал SNAPPY для второго поля.
#include <iostream>
#include "arrow/builder.h"#include "arrow/table.h"#include "arrow/io/file.h"#include <parquet/arrow/writer.h>
#include <parquet/properties.h>
main() {
arrow::Int32Builder sip_builder(arrow::default_memory_pool());
arrow::Int32Builder dip_builder(arrow::default_memory_pool());
for(size_t i=0; i < 1000; i++) { // simulate row-oriented incoming data
sip_builder.Append(i*100);
dip_builder.Append(i*10 + i);
}
std::shared_ptr<arrow::Array> sip_array;
sip_builder.Finish(&sip_array);
std::shared_ptr<arrow::Array> dip_array;
dip_builder.Finish(&dip_array);
std::vector<std::shared_ptr<arrow::Field>> schema_definition = {
arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */),
arrow::field("dip", arrow::int32(), false)
};
auto schema = std::make_shared<arrow::Schema>(schema_definition);
std::shared_ptr<arrow::Table> arrow_table;
MakeTable(schema, {sip_array, dip_array}, &arrow_table);
std::shared_ptr<arrow::io::FileOutputStream> file_output_stream;
arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream);
parquet::WriterProperties::Builder props_builder;
props_builder.compression(parquet::Compression::GZIP);
props_builder.compression("dip", parquet::Compression::SNAPPY);
auto props = props_builder.build();
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
file_output_stream, sip_array->length(), props);
std::cout << "done" << std::endl;
}
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out
done
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet
File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 1000
Number of RowGroups: 1 <<----------
Number of Real Columns: 2
Number of Columns: 2
Number of Selected Columns: 2
Column 0: sip (INT32)
Column 1: dip (INT32)
--- Row Group 0 ---
--- Total Bytes 8425 ---
Rows: 1000---
Column 0
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 99900, Min: 0
Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 3109
Column 1
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 10989, Min: 0
Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 5316
Приведенный выше код записывает одну группу строк для всей таблицы / файла. В зависимости от того, сколько строк данных у вас есть, это может быть не идеально, так как слишком большое количество строк может привести к «возврату к простому кодированию» (см. Презентация Райана Блю, слайды 31-34). Чтобы записать несколько групп строк для таблицы / файла, установите chunk_size
аргумент меньше (ниже я делю на 2, чтобы получить две группы строк на таблицу / файл):
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
fileOutputStream, sip_array->length()/2, props);
Это все еще не идеально. Все данные для файла должны быть буферизованы / сохранены в таблице Arrow перед вызовом parquet::arrow::WriteTable()
, так как эта функция открывает и закрывает файл. Я хочу написать несколько групп строк для каждого файла, но я хочу буферизовать / хранить только одну или две группы строк данных за один раз в памяти. Следующий код выполняет это. Он основан на коде в паркет / стрелок / writer.cc:
#include <parquet/util/memory.h>
...
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream);
std::unique_ptr<parquet::arrow::FileWriter> writer;
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(),
arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(),
&writer);
// write two row groups for the first table
writer->WriteTable(*arrow_table, sip_array->length()/2);
// ... code here would generate a new table ...
// for now, we'll just write out the same table again, to
// simulate writing more data to the same file, this
// time as one row group
writer->WriteTable(*arrow_table, sip_array->length());
writer->Close();
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 2000000
Number of RowGroups: 3 <<--------
...
--- Row Group 0 ---
--- Total Bytes 2627115 ---
Rows: 500000---
...
--- Row Group 1 ---
--- Total Bytes 2626873 ---
Rows: 500000---
...
--- Row Group 2 ---
--- Total Bytes 4176371 ---
Rows: 1000000---