반응형

빅데이터 강의중 kafka에 대한 실습 설명입니다

포스팅 기준 2.3.0 버젼이 최신이라 2.3.0 버젼으로 진행합니다

보통은 클러스터를 해야하지만 간단하게 Kafka가 무엇인지 따라해보고 실습하는게 목적이기 때문에 단일 서버에서 진행합니다

python3 kafka package 설치

confluent_kafka라는 python3 package를 사용하겠습니다

# confluent_kafka 설치
$ python3 -m pip install confluent_kafka

 

 

producing code 짜기

간단한 python producing code

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(self, err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for i in range(11):
    producer.poll(0)
    #(topic 이름, data 내용 , callback 함수)
    producer.produce("bigdata", str(i).encode('utf-8'), callback=delivery_report)
    print(str(i)+" is producing now")
    producer.flush() # flush - db commit 같은것!

kafka consumer로 확인해보기

$ cd kafka_2.12-2.3.0
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bigdata --from-beginning
# --from-beginning 처음부터 소비하겠다는 옵션

결과
0
1
2
3
4
5
6
7
8
9
10

consuming code 짜기

간단한 python consuming code

from confluent_kafka import Consumer
consumer = Consumer({

    'bootstrap.servers': 'localhost:9092',

    'group.id': 'console-consumer-37587',

    'default.topic.config': {

        'auto.offset.reset': 'smallest'

    }

})

consumer.subscribe(['bigdata'])
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close()


결과
Received message: test
Received message: test2
Received message: test3

consumer group id 찾기

$ cd kafka_2.12-2.3.0
$ bin/kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092

결과
console-consumer-27273

 

배웠던 크롤링 코드로 producing 해보기

news 제목 크롤링 코드 producing 해보기

from confluent_kafka import Producer
import requests
from bs4 import BeautifulSoup
res = requests.get("http://news.naver.com")

soup = BeautifulSoup(res.text, "html.parser")
data = soup.select("div.com_list > div > ul > li> a > strong")

producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for item in data:
    print(item.string)
    producer.produce("bigdata", item.string.encode('utf-8'), callback=delivery_report)
producer.flush()


결과
‘장관 패싱’ 직보받은 靑안보실… ‘北목선 귀순’ 이어 또 월권 논란
반환점 돈 文정부… 與 “삶의 질 향상” 野 “암흑의 시간”
적진 출마하는 중진들… 잘되면 바람몰이, 낙선땐 나락으로
초·재선도 청년들도…무르익는 보수대통합 분위기
심상정 “이자스민 입당, 내가 권유…黃 ‘임금차별’ 발언 때 한국당 정리 생각도”
올들어 재정적자 57조 ‘사상 최대’
추가 해명에도… 분양가상한제 논란 더 커져
트럼프 "대중국 관세철회 합의하지 않았다"(종합)
사우디 아람코 기업공개, 지분 5%만 파는 ‘찻잔 속 개혁’
분상제 후폭풍…“당첨차익 10억” 청약 열풍, 새 아파트 꿈틀
실리콘밸리서 정의선이 던진 말, 항공택시·스마트시티
연간 250만 명 넘게 찾아...영남알프스 9봉 등정 인기
'그것이 알고 싶다' 부산 미제전담팀의 1번 사건의 진실은
아시아나 인수, 현대산업이 애경보다 6000억 더 썼다
외고·국제고·자사고, 일반고 일괄 전환에 1조500억 든다
연말 수놓을 별똥별 잔치 시작됐다
[설채현의 '왜 이러는 개냥'] 4화 '꾸꾸의 이중생활'
방송사와 기획사 유착 확인..."CJENM 규제 필요"
“혐한 부채질해도 우린 간다”…한국 찾는 일본인 되레 늘어
노년기 운동은 과유불급, 능력치의 ‘55% 미학’ 지키자
마크롱 "트럼프에게 동맹은 상업적 대상… 나토 뇌사상태"
브라질 심해유전 입찰 참여 기대 밑돌아…中 덕분에 체면치레
잇단 마약카르텔 총격에 멕시코 대통령 지지율도 '흔들'
유럽증시, 미중 무역협상 상충되는 주장에 하락 마감
홍콩 디즈니랜드, 시위 사태에 직격탄…경제 전반 타격
[TF확대경] LG전자, '로봇명가' 거듭난다…사업 비전 알리기 총력
‘갤폴드’ 온라인 매장서 2초 만에 완판... 삼성, 중국시장서 자존심 회복 ‘청신호’
'원전 핵폐기물 관리' 원점으로…전문가 검토그룹 출범
[핫클립]고무줄 머신미니건
택시 동승 중개 '반반택시' 100일…"승객도, 택시 기사도 이득"
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]

(실습) 실시간 검색어 producing 해보기

import requests
from confluent_kafka import Producer
from bs4 import BeautifulSoup
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
res = requests.get("http://www.naver.com")
soup = BeautifulSoup(res.text, "html.parser")
title_data = soup.find("h1")
a_list = soup.select("span.ah_k")
i=1
for item in a_list:
    print(i, item.string)
    i+=1
    producer.produce("bigdata", item.string.encode('utf-8'), callback=delivery_report)
producer.flush()


결과
1 순수의시대
2 송은이표 강조
3 호스틸
4 영화 호스틸
5 강한나
6 대성리역
7 영화 순수의시대
8 썸바디2 목격담
9 궁금한이야기y
10 장준혁
11 택시 할증시간
12 설인아
13 썸바디
14 박원숙아들
15 심상덕
16 윤지오
17 범인은 바로 너 시즌2
18 귀멸의 칼날
19 썸바디2
20 배가본드 결방
21 순수의시대
22 송은이표 강조
23 호스틸
24 영화 호스틸
25 강한나
26 대성리역
27 영화 순수의시대
28 썸바디2 목격담
29 궁금한이야기y
30 장준혁
31 택시 할증시간
32 설인아
33 썸바디
34 박원숙아들
35 심상덕
36 윤지오
37 범인은 바로 너 시즌2
38 귀멸의 칼날
39 썸바디2
40 배가본드 결방
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]
Message delivered to bigdata [0]

두 코드 모두 consuming 확인은 위의 consuming python 코드로 확인 할 수 있습니다

프로젝트 - 부동산 데이터로 가지고 producing 및 consuming 해보기

  • 강의 끝나고 코드 올라갑니다

table 하나 만들기

부동산 데이터 insert 해주는 sql문 producing 하기

sql문을 consuming 해서 table에 넣어보기

'Cloud > Kafka' 카테고리의 다른 글

Kafka 따라해보기 - 1 (설치하기 및 실행하기)  (0) 2019.11.09
Kafka 따라해보기 - 0 (설명)  (0) 2019.11.09
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기