ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Mac OS 에서 Apache Nifi 사용해보기 #5 - 실습(날씨 데이터 수집)3
    Intelligent Product/Apache NiFi 2022. 10. 19. 10:17

    지난번 포스트까지는 날씨 데이터를 가져와서 그대로 파일로 저장하는 플로우를 작성해보았다.

     

    이번에는 데이터를 정제하여 RDB에 입력하는 프로세스를 작성해보려고 한다.

     

    이번에 작성한 Flow는 아래와 같다.

    InvokeHTTP : 기상청으로부터 데이터 요청한다.

    EvaluateJsonPath: flowFile content에 있는 JSON 데이터를 확인하여 flowFile Attribute 로 저장한다.

    ExecuteGroovyScript: groovy script를 실행시키며, 여기에 날씨데이터를 파싱하는 로직을 추가하여 데이터를 정제한다.

    PutSQL: RDB에 데이터 적재한다.

     

     

    1. InvokeHTTP

    - Run Schedule : 0 15 2/3 ? * * * (2시부터 3시간 간격으로 15분에 실행한다.)

    - HTTP Method: GET

    - HTTP URL : http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getVilageFcst?serviceKey=[ServiceKey]&pageNo=1&numOfRows=50&dataType=JSON&base_date=${now():format("yyyyMMdd")}&base_time=${now():format("HH")}00&nx=57&ny=126

     
    2. EvaluateJsonPath

    - InvokeHTTP 에서 가져온 데이터에서 response.body.items.item을 content attribute 에 저장한다.

    {
        "response": {
            "header": {
                "resultCode": "00",
                "resultMsg": "NORMAL_SERVICE"
            },
            "body": {
                "dataType": "JSON",
                "items": {
                    "item": [
                        {
                            "baseDate": "20221018",
                            "baseTime": "0800",
                            "category": "TMP",
                            "fcstDate": "20221018",
                            "fcstTime": "0900",
                            "fcstValue": "8",
                            "nx": 57,
                            "ny": 126
                        },

    content에는 item 배열이 저장된다.

     

    3. ExecuteGroovyScript

    - Script Body 에 Groovy script를 입력하여 실행 할 수 있다.

    import groovy.json.JsonBuilder
    import groovy.json.JsonSlurper
    
    def flowFile = session.get()
    if(!flowFile) return
    
    def newFlowFile = session.create();
    
    slurper = new JsonSlurper()
    
    def transferJson = [:]
    
    def content = flowFile.getAttribute('content')
    def items = new JsonSlurper().parseText(content)
    def fcstTime = items[0].fcstTime.toString()
    def fcstDate = items[0].fcstDate.toString()
    def baseTime = items[0].baseTime.toString()
    def baseDate = items[0].baseDate.toString()
    def header = ['TMP', 'UUU', 'VVV', 'VEC', 'WSD', 'SKY', 'PTY', 'POP', 'WAV', 'PCP', 'REH', 'SNO', 'ANNC_DT', 'FCST_DT']
    
    items.each{it -> 
        if ((it.fcstTime == fcstTime) && (it.fcstDate == fcstDate) && header.contains(it.category)) {
            transferJson << [(it.category) : (it.fcstValue)]
        }
    }
    
    transferJson << [('ANNC_DT') : (baseDate+baseTime)]
    transferJson << [('FCST_DT') : (fcstDate+fcstTime)]
        
    def output = new JsonBuilder(transferJson).toPrettyString()
    
    newFlowFile.putAttribute("transform", output)
    
    header.each{ val->
        newFlowFile.putAttribute(val, transferJson[val])
    }
    
    newFlowFile.write("UTF-8", output);
    log.info(output)
    
    session.remove(flowFile);
    REL_SUCCESS << newFlowFile

    - 정제된 데이터 원본은 transform attribute에 저장하고 해당 내용들은 각 attribute로 만들어서 저장한다.

    - 결과를 확인하면 다음과 같다.

    - 정제된 데이터는 다음과 같다.

    {
        "TMP": "9",
        "UUU": "-0.1",
        "VVV": "-0.9",
        "VEC": "13",
        "WSD": "0.9",
        "SKY": "1",
        "PTY": "0",
        "POP": "0",
        "WAV": "0",
        "PCP": "\uac15\uc218\uc5c6\uc74c",
        "REH": "60",
        "SNO": "\uc801\uc124\uc5c6\uc74c",
        "ANNC_DT": "202210190800",
        "FCST_DT": "202210190900"
    }

     

    4. PutSQL

    - DB 연결을 위해서는 JDBC Connection Pool(Controller Service) 을 만들어 줘야한다. 해당 항목 우측의 화살표를 누르면 설정 화면으로 이동할 수 있다.

    - +를 누르면 새로 생성 가능하며 내용은 다음과 같이 설정하였다.

    - NiFi 에는 JDBC 드라이버가 없기 때문에 MySQL 공식 홈페이지에서 드라이버를 다운받아서 설치하였다.

    - 데이터 저장 쿼리

    INSERT INTO nifi.weather_forecast (annc_dt, fcst_dt, pcp, pop, pty, reh, sky, sno, tmp, uuu, vvv, vec, wsd, wav, created_datetime, updated_by, updated_datetime) VALUES('${ANNC_DT}', '${FCST_DT}', '${PCP}', '${POP}', '${PTY}', '${REH}', '${SKY}', '${SNO}', '${TMP}', '${UUU}', '${VVV}', '${VEC}', '${WSD}', '${WAV}', CURRENT_TIMESTAMP, 0, CURRENT_TIMESTAMP);

    tip) ${attribute_name} 의 표현식을 이용하여 flowFile의 Attribute 를 사용할 수 있다.

     

    * 결과

Designed by Tistory.