반응형

0. 요구사항

현재 데이터는 고객 아이디 별로 Bucketing(CUST_CNTA_EXP_H 필드)되어 있어서 하루 전 데이터를 export할 경우, 해당 데이터가 아닌 것도 데이터가 같이 추출 됨 (Document단위이기 때문)

 Hadoop으로 데이터를 가져가야 해서 CSV형태로 데이터를 추출해서 저장해야 함.

 

1. 구현 방법

  1. 하루 전 데이터를 aggregate pipeline으로 새로운 collection으로 저장

  2. 새로운 collection을 export

 

2. 구현 Shell

 

2.1 Linux Shell

 

#!/bin/bash


yesterDay=`date -d yesterday +%Y-%m-%d`
toDay=`date +%Y-%m-%d`


echo "Start --------------------------" >> /home/mongo/scripts/$toDay.out
echo "Export $yesterDay data..... " >> /home/mongo/scripts/$toDay.out
date >> /home/mongo/scripts/$toDay.out
echo "--------------------------" >> /home/mongo/scripts/$toDay.out
echo " " >> /home/mongo/scripts/$toDay.out
echo "To make $yesterDay data to new collection "  >> /home/mongo/scripts/$toDay.out
echo "  "  >> /home/mongo/scripts/$toDay.out


nohup /mongo_sw/ent/bin/mongo "mongodb://ceppadm:Pceppadm%21123@172.23.1.11:27017,172.23.1.12:27017,172.23.1.13:27019/pcepp?authSource=admin&replicaSet=pceppReplicaSet" < /home/mongo/scripts/make_for_hadoop.js >> /home/mongo/scripts/$toDay.out 2>&1 &


echo " " >> /home/mongo/scripts/$toDay.out
echo "Waiting...." >> /home/mongo/scripts/$toDay.out


while true
do
date >> /home/mongo/scripts/$toDay.out
checkEnding=`grep -rnw /home/mongo/scripts/$toDay.out -e 'Collection created'`
if [ "$checkEnding" != "" ]
then
    break
fi
sleep 300
done


echo "---------------------------" >> /home/mongo/scripts/$toDay.out
date >> /home/mongo/scripts/$toDay.out
echo "Finished to create collection " >> /home/mongo/scripts/$toDay.out
echo "---------------------------" >> /home/mongo/scripts/$toDay.out
echo " " >> /home/mongo/scripts/$toDay.out




echo "Start export" >> /home/mongo/scripts/$toDay.out
date >> /home/mongo/scripts/$toDay.out
echo "---------------------------" >> /home/mongo/scripts/$toDay.out
echo " " >> /home/mongo/scripts/$toDay.out
nohup /mongo_sw/ent/bin/mongoexport -c for_hadoop -o $toDay.csv --type=csv -f "CUST_NO,YYMM,CUST_CNTA_EXP_H" --uri="mongodb://ceppadm:Pceppadm%21123@172.23.1.11:27017,172.23.1.12:27017,172.23.1.13:27019/pcepp?authSource=admin&replicaSet=pceppReplicaSet" >> /home/mongo/scripts/$toDay.out 2>&1 &

 

2.2 새로운 컬렉션을 만드는 MQL

기본으로 몽고 DB UTC 시간을 사용하는데, LG U+에서는 한국 시간으로 데이터를 넣고 있어서 한국 시간으로 변형 필요

하루 날짜는 - (1000*60*60*24*1) 주면

 

make_for_hadoop.js




var YD=new Date(ISODate() - (1000*60*60*24*1) )
YD
var yyyy=YD.getFullYear();
var mm=YD.getMonth()+1;
var dd=YD.getDate();
var hh=YD.getHours();
var min=YD.getMinutes();


if(mm<10){mm='0'+mm}
if(dd<10){dd='0'+dd}


var yyyymm=yyyy+mm ;
var startTime=yyyy+'-'+mm+'-'+dd+' 00:00:00.000+0900';
var endTime=yyyy+'-'+mm+'-'+dd+' 23:59:59.000+0900' ;


yyyymm
startTime
endTime


db.tb_cc_cust_cnta_exp_h.aggregate([
{ $match : { YYMM : yyyymm , "CUST_CNTA_EXP_H.SYS_CREATION_DATE" : { $gte : startTime }} } ,
{ $unwind : "$CUST_CNTA_EXP_H" } ,
{ $match : { "CUST_CNTA_EXP_H.SYS_CREATION_DATE" : { $gte : startTime , $lte : endTime} } } ,
{ $project : {_id : 0 , HST_CNT : 0 , CUCT_DTTM_MIN : 0 , CUCT_DTTM_MAX : 0 , CUST_EXP_EXTR_DT : 0} } ,
{ $out : "for_hadoop" }
] ,
{allowDiskUse : true} )


db.for_hadoop.count({ })


print ("Collection created...")

 

위와 같이 하면 CUST_CNTA_EXP_H Key값에 Sub Document Type으로 되어서 export 시에 이상하게

아래와 같이 변경 하여 Key : Value 형태로 변경을 다시 한번 줘야 .

 

make_for_hadoop.js




var YD=new Date(ISODate() - (1000*60*60*24*1) )
YD
var yyyy=YD.getFullYear();
var mm=YD.getMonth()+1;
var dd=YD.getDate();
var hh=YD.getHours();
var min=YD.getMinutes();


if(mm<10){mm='0'+mm}
if(dd<10){dd='0'+dd}


var yyyymm=yyyy+mm ;
var startTime=yyyy+'-'+mm+'-'+dd+' 00:00:00.000+0900';
var endTime=yyyy+'-'+mm+'-'+dd+' 23:59:59.000+0900' ;


yyyymm
startTime
endTime


db.tb_cc_cust_cnta_exp_h.aggregate([
{ $match : { YYMM : yyyymm , "CUST_CNTA_EXP_H.SYS_CREATION_DATE" : { $gte : startTime }} } ,
{ $unwind : "$CUST_CNTA_EXP_H" } ,
{ $match : { "CUST_CNTA_EXP_H.SYS_CREATION_DATE" : { $gte : startTime , $lte : endTime} } } ,
{ $replaceRoot: { newRoot : { $mergeObject : [ "$CUST_CNTA_EXP_H" , "$$ROOT" ] } } } ,
{ $project : {_id : 0 , HST_CNT : 0 , CUCT_DTTM_MIN : 0 , CUCT_DTTM_MAX : 0 , CUST_EXP_EXTR_DT : 0 , CUST_CNTA_EXP_H : 0 } } ,
{ $out : "for_hadoop" }
] ,
{allowDiskUse : true} )


db.for_hadoop.count({ })


print ("Collection created...")

 

$unwind 결과가 배열로 나오는지 ,Document 나오는지에 따라서 다음과 같은 방법으로 상위로 올릴 있다.

 

// 결과가 Array 나올
{ $replaceRoot: { newRoot: { $mergeObjects: [ { $arrayElemAt: [ "$CUST_CNTA_EXP_H", 0 ] }, "$$ROOT" ] } } },
{ $project:{_id:0,CUST_CNTA_EXP_H:0}}


// 결과가 Document 나올
{ $replaceRoot: { newRoot: { $mergeObjects: [ $CUST_CNTA_EXP_H, "$$ROOT" ] } } },
{ $project:{_id:0,CUST_CNTA_EXP_H:0}}

 

반응형

+ Recent posts