Я написал следующий код для задачи MapReduce
#include<algorithm>
#include<string>
#include<stdint.h>
#include<limits.h>
#include<sstream>
#include<vector>
#include "hadoop/Pipes.hh"#include "hadoop/TemplateFactory.hh"#include "hadoop/StringUtils.hh"
using namespace std;
#define THRESHOLD 3//support/conf values
struct canTrie_t//trie structure for canTrie
{
string item;//item_name
int count;//its count
struct canTrie_t* nextItems[26];//pointers to other items
struct canTrie_t* up;//parent
};
canTrie_t* createNewCanTrie()//constructor
{
canTrie_t* canTrie=(canTrie_t*)malloc(sizeof(canTrie_t));
canTrie->item="@";//for root
canTrie->count=0;
canTrie->up=NULL;
for(int i=0;i<26;i++)
{
canTrie->nextItems[i]=NULL;
}
return canTrie;
}
void insert(canTrie_t* &canTrie,string itemString,list<canTrie_t*> &leafList,int (&count)[26])
{
canTrie_t *tmp_node, *new_node;
tmp_node = canTrie;
for(int i=0;i<itemString.size();i++)
{
count[(itemString[i]-'A')]++;//increment count of the item
new_node=tmp_node->nextItems[(itemString[i]-'A')];//which branch to move to
if(new_node==NULL)//the item being discovered for the first time here
{
new_node=(canTrie_t*)malloc(sizeof(canTrie_t));
new_node->item=itemString[i];
new_node->count=1;
for(int j=0;i<26;i++)
{
new_node->nextItems[j]=NULL;
}
new_node->up=tmp_node;
tmp_node=new_node;
}
else
{
new_node->count++;
tmp_node=new_node;
}
if(i==(itemString.size()-1))//leaf node
{
leafList[itemString[i]-'A'].push_back(tmp_node);//push that pointer into it
}
}
}
void mineFreqItems(canTrie_t* &canTrie,list<canTrie_t*> *leafList,int (&count)[26],vector<string> &freqItemsets)
{
canTrie_t* tmp_node;
list<canTrie_t*>::iterator iter;//an iterator for the leafnodes belonging to a particular item
for(int i=0;i<26;i++)
{
if(count[i]>=THRESHOLD)
{
freqItemsets.push_back(to_string((char)(i+'A'));
//outFile<<(char)(i+'A')<<endl;//for each item first!
}
for (iter = leafList[i].begin(); iter != leafList[i].end(); ++iter)
{
tmp_node=(*iter);
while(tmp_node->count<(THRESHOLD))tmp_node=tmp_node->up;
if(tmp_node!=NULL)
{
while(tmp_node!=NULL)
{
freqItemsets.push_back(tmp->node->item));
//outFile<<tmp_node->item<<" ";//once they surpass the threshold for support and confidence
tmp_node=tmp_node->up;
}
}
}
}
}
class partitionMapper : public HadoopPipes::Mapper {
public:
partitionMapper(HadoopPipes::TaskContext& context) {
}
void map(HadoopPipes::MapContext& context) {
string line=context.getInputValue();
string buf; // Have a buffer string
stringstream ss(line); // Insert the string into a stream
vector<string> attributes; // Create vector to hold our words
while (ss >> buf)
attributes.push_back(buf);
string ageText=attributes[1];
size_t foundMidAged = ageText.find("middle-aged");
size_t foundYoung=ageText.find("young");
string raceText=attributes[8];
size_t foundWhite=raceText.find("White");
size_t foundBlack=raceText.find("Black");
bool foundOtherRace;
if(foundWhite==string::npos && foundBlack==string::npos)foundOtherRace=true;
string sexText=attributes[9];
size_t foundMale=sexText.find("Male");
size_t foundFemale=sexText.find("Female");
string salaryText=attributes[14];
size_t foundRich=salaryText.find(">50");
size_t foundPoor=salaryText.find("<=50");
if(foundMidAged!=string::npos && foundBlack!=string::npos && foundMale!=string::npos && foundPoor!=string::npos)
{
string Key="MidAged Black Male Poor";
context.emit(key, attributes[0]);
}
else if(foundYoung!=string::npos && foundBlack!=string::npos && foundFemale!=string::npos && foundPoor!=string::npos)
{
string Key="Young Black Female Poor";
context.emit(key, attributes[0]);
}
else if(foundYoung!=string::npos && foundWhite!=string::npos && foundMale!=string::npos && foundRich!=string::npos)
{
string Key="Young White Male Rich";
context.emit(key, attributes[0]);
}
else if(foundMidAged!=string::npos && foundWhite!=string::npos && foundFemale!=string::npos && foundPoor!=string::npos)
{
string Key="MidAged White Female Poor";
context.emit(key, attributes[0]);
}
else if(foundMidAged!=string::npos && foundOtherRace=true && foundFemale!=string::npos)
{
string Key="MidAged Asian Female Poor";
context.emit(key, attributes[0]);
}
else if(foundYoung!=string::npos && foundOtherRace!=string::npos && foundMale!=string::npos && foundRich!=string::npos)
{
string Key="Young Asian Male Rich";
context.emit(key, attributes[0]);
}
}
};
class partitionReducer : public HadoopPipes::Reducer {
public:
partitionReducer(HadoopPipes::TaskContext& context) {
}
void reduce(HadoopPipes::ReduceContext& context) {
canTrie_t* canTrie=createNewCanTrie();
list<canTrie_t*> *leafList=new list<canTrie_t*>[26];
int count[26];//count of # of items
memset(count,0,sizeof(count));//setting count array to 0
string itemString;//every transaction instance
while (context.nextValue()) {
itemString=HadoopUtils::toString(context.getInputValue());
sort(itemString.begin(), itemString.end());
insert(canTrie,itemString,leafList,count);
}
vector<string> freqItemsets;
mineFreqItems(canTrie,leafList,count,freqItemsets);
for(int i=0;i<freqItemsets.size())
{
context.emit(context.getInputKey(), HadoopUtils::toString(freqItemsets[i])); //convert to set/treemap/arraylist
}
canTrees.push_back(canTrie);//make a goddamn map instead!
}
};
int main(int argc, char *argv[]) {
vector<canTrie_t*> canTrees;//use a fuckin' map !!!
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<partitionMapper,
partitionReducer>());
}
когда запускал код вместе со следующим make-файлом:
CC = g++
HADOOP_INSTALL = /home/hadoop/hadoop
PLATFORM = Linux-i386-32
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
partitionMR: partitionMR.cpp
$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
-lhadooputils -lpthread -g -O2 -o $@
Показывает следующую ошибку: —
В функции partitionReducer::reduce(HadoopPipes::ReduceContext&)':
HadoopUtils :: ToString (интермедиат)»
partitionMR.cpp:(.text._ZN16partitionReducer6reduceERN11HadoopPipes13ReduceContextE[partitionReducer::reduce(HadoopPipes::ReduceContext&)]+0x1a9): undefined reference to
collect2: ld вернул 1 статус выхода
делать: * [partitionMR] Ошибка 1
Любая подсказка относительно ошибки, которую я получаю?!?
Пожалуйста, помогите!
Спасибо !
Задача ещё не решена.
Других решений пока нет …