Apache Kafka-빠른 가이드
빅 데이터에서는 엄청난 양의 데이터가 사용됩니다. 데이터와 관련하여 두 가지 주요 과제가 있는데, 첫 번째 과제는 대용량 데이터를 수집하는 방법이고 두 번째 과제는 수집 된 데이터를 분석하는 것입니다. 이러한 문제를 극복하려면 메시징 시스템이 필요합니다.
Kafka는 분산 처리량이 많은 시스템을 위해 설계되었습니다. Kafka는보다 전통적인 메시지 브로커의 대체물로 매우 잘 작동하는 경향이 있습니다. 다른 메시징 시스템에 비해 Kafka는 더 나은 처리량, 기본 제공 파티셔닝, 복제 및 고유 한 내결함성을 제공하므로 대규모 메시지 처리 응용 프로그램에 적합합니다.
메시징 시스템이란?
메시징 시스템은 한 응용 프로그램에서 다른 응용 프로그램으로 데이터를 전송하는 역할을하므로 응용 프로그램이 데이터에 집중할 수 있지만 공유 방법에 대해서는 걱정하지 않아도됩니다. 분산 메시징은 안정적인 메시지 대기열 개념을 기반으로합니다. 메시지는 클라이언트 응용 프로그램과 메시징 시스템간에 비동기 적으로 대기합니다. 두 가지 유형의 메시징 패턴을 사용할 수 있습니다. 하나는 포인트 투 포인트이고 다른 하나는 발행-구독 (pub-sub) 메시징 시스템입니다. 대부분의 메시징 패턴은 다음과 같습니다.pub-sub.
지점 간 메시징 시스템
지점 간 시스템에서 메시지는 대기열에 유지됩니다. 하나 이상의 소비자가 큐의 메시지를 사용할 수 있지만 특정 메시지는 최대 한 명의 소비자 만 사용할 수 있습니다. 소비자가 큐에있는 메시지를 읽으면 해당 큐에서 사라집니다. 이 시스템의 전형적인 예는 주문 처리 시스템으로, 각 주문은 하나의 주문 프로세서에 의해 처리되지만 여러 주문 프로세서도 동시에 작동 할 수 있습니다. 다음 다이어그램은 구조를 보여줍니다.
발행-구독 메시징 시스템
발행-구독 시스템에서 메시지는 토픽에 지속됩니다. 지점 간 시스템과 달리 소비자는 하나 이상의 주제를 구독하고 해당 주제의 모든 메시지를 사용할 수 있습니다. Publish-Subscribe 시스템에서 메시지 생성자는 게시자라고하고 메시지 소비자는 구독자라고합니다. 실제 예는 스포츠, 영화, 음악 등과 같은 다양한 채널을 게시하는 Dish TV이며 누구나 자신의 채널 세트를 구독하고 구독 한 채널을 사용할 수있을 때마다받을 수 있습니다.
Kafka는 무엇입니까?
Apache Kafka는 분산 발행-구독 메시징 시스템이며 많은 양의 데이터를 처리 할 수 있고 한 엔드 포인트에서 다른 엔드 포인트로 메시지를 전달할 수있는 강력한 큐입니다. Kafka는 오프라인 및 온라인 메시지 소비 모두에 적합합니다. Kafka 메시지는 디스크에 유지되며 데이터 손실을 방지하기 위해 클러스터 내에서 복제됩니다. Kafka는 ZooKeeper 동기화 서비스 위에 구축되었습니다. 실시간 스트리밍 데이터 분석을 위해 Apache Storm 및 Spark와 매우 잘 통합됩니다.
혜택
다음은 Kafka의 몇 가지 이점입니다-
Reliability − Kafka는 분산, 분할, 복제 및 내결함성입니다.
Scalability − Kafka 메시징 시스템은 다운 타임없이 쉽게 확장됩니다.
Durability− Kafka는
분산 커밋 로그
를 사용하므로 메시지가 가능한 한 빨리 디스크에 유지되므로 내구성이 있습니다.Performance− Kafka는 메시지 게시 및 구독 모두에 대해 높은 처리량을 제공합니다. 많은 TB의 메시지가 저장 되어도 안정적인 성능을 유지합니다.
Kafka는 매우 빠르며 다운 타임이없고 데이터 손실이 전혀 없습니다.
사용 사례
Kafka는 많은 사용 사례에서 사용할 수 있습니다. 그들 중 일부는 아래에 나열되어 있습니다-
Metrics− Kafka는 운영 모니터링 데이터에 자주 사용됩니다. 여기에는 분산 된 애플리케이션의 통계를 집계하여 운영 데이터의 중앙 집중식 피드를 생성하는 것이 포함됩니다.
Log Aggregation Solution − Kafka는 여러 서비스에서 로그를 수집하고 여러 소비자가 표준 형식으로 사용할 수 있도록 조직 전체에서 사용할 수 있습니다.
Stream Processing− Storm 및 Spark Streaming과 같은 인기있는 프레임 워크는 주제에서 데이터를 읽고, 처리하고, 처리 된 데이터를 사용자와 애플리케이션에서 사용할 수있는 새 주제에 씁니다. Kafka의 강력한 내구성은 스트림 처리의 맥락에서도 매우 유용합니다.
Kafka 필요
Kafka는 모든 실시간 데이터 피드를 처리하기위한 통합 플랫폼입니다. Kafka는 짧은 대기 시간 메시지 전달을 지원하고 시스템 오류가있는 경우 내결함성을 보장합니다. 수많은 다양한 소비자를 처리 할 수있는 능력이 있습니다. Kafka는 매우 빠르며 초당 2 백만 번의 쓰기를 수행합니다. Kafka는 모든 데이터를 디스크에 유지하므로 기본적으로 모든 쓰기가 OS (RAM)의 페이지 캐시로 이동합니다. 이렇게하면 페이지 캐시에서 네트워크 소켓으로 데이터를 매우 효율적으로 전송할 수 있습니다.
Kafka에 깊이 들어가기 전에 주제, 브로커, 생산자 및 소비자와 같은 주요 용어를 알아야합니다. 다음 다이어그램은 주요 용어를 설명하고 표는 다이어그램 구성 요소를 자세히 설명합니다.
위의 다이어그램에서 주제는 3 개의 파티션으로 구성됩니다. 파티션 1에는 두 개의 오프셋 요소 0과 1이 있습니다. 파티션 2에는 네 개의 오프셋 요소 0, 1, 2 및 3이 있습니다. 파티션 3에는 하나의 오프셋 요소 0이 있습니다. 복제본의 ID는이를 호스팅하는 서버의 ID와 동일합니다.
주제의 복제 요소가 3으로 설정된 경우 Kafka는 각 파티션의 동일한 복제본 3 개를 만들고 클러스터에 배치하여 모든 작업에 사용할 수 있도록합니다. 클러스터의로드 균형을 맞추기 위해 각 브로커는 이러한 파티션 중 하나 이상을 저장합니다. 여러 생산자와 소비자가 동시에 메시지를 게시하고 검색 할 수 있습니다.
S. 아니 | 구성 요소 및 설명 |
---|---|
1 | Topics 특정 범주에 속하는 메시지 스트림을 주제라고합니다. 데이터는 주제에 저장됩니다. 주제는 파티션으로 분할됩니다. 각 주제에 대해 Kafka는 최소 하나의 파티션을 유지합니다. 이러한 각 파티션에는 순서가 변경되지 않는 순서로 메시지가 포함됩니다. 파티션은 동일한 크기의 세그먼트 파일 세트로 구현됩니다. |
2 | Partition 주제는 많은 파티션을 가질 수 있으므로 임의의 양의 데이터를 처리 할 수 있습니다. |
삼 | Partition offset 분할 된 각 메시지에는 |
4 | Replicas of partition 복제본은 파티션의 |
5 | Brokers
|
6 | Kafka Cluster 하나 이상의 브로커가있는 Kafka를 Kafka 클러스터라고합니다. Kafka 클러스터는 다운 타임없이 확장 할 수 있습니다. 이러한 클러스터는 메시지 데이터의 지속성 및 복제를 관리하는 데 사용됩니다. |
7 | Producers 생산자는 하나 이상의 Kafka 주제에 대한 메시지 게시자입니다. 생산자는 Kafka 중개인에게 데이터를 보냅니다. 생산자가 브로커에 메시지를 게시 할 때마다 브로커는 메시지를 마지막 세그먼트 파일에 추가하기 만하면됩니다. 실제로 메시지는 파티션에 추가됩니다. 생산자는 자신이 선택한 파티션으로 메시지를 보낼 수도 있습니다. |
8 | Consumers 소비자는 브로커에서 데이터를 읽습니다. 소비자는 하나 이상의 주제를 구독하고 브로커에서 데이터를 가져와 게시 된 메시지를 소비합니다. |
9 | Leader
|
10 | Follower 리더 지시를 따르는 노드를 팔로어라고합니다. 리더가 실패하면 추종자 중 한 명이 자동으로 새 리더가됩니다. 팔로어는 일반 소비자 역할을하고 메시지를 가져오고 자체 데이터 저장소를 업데이트합니다. |
다음 그림을 살펴보십시오. Kafka의 클러스터 다이어그램을 보여줍니다.
다음 표는 위 다이어그램에 표시된 각 구성 요소를 설명합니다.
S. 아니 | 구성 요소 및 설명 |
---|---|
1 | Broker Kafka 클러스터는 일반적으로로드 균형을 유지하기 위해 여러 브로커로 구성됩니다. Kafka 브로커는 상태 비 저장이므로 ZooKeeper를 사용하여 클러스터 상태를 유지합니다. 하나의 Kafka 브로커 인스턴스는 초당 수십만 번의 읽기 및 쓰기를 처리 할 수 있으며 각 브로커는 성능에 영향을주지 않고 TB 메시지를 처리 할 수 있습니다. Kafka 브로커 리더 선택은 ZooKeeper에서 수행 할 수 있습니다. |
2 | ZooKeeper ZooKeeper는 Kafka 브로커를 관리하고 조정하는 데 사용됩니다. ZooKeeper 서비스는 주로 생산자와 소비자에게 Kafka 시스템에 새로운 브로커의 존재 또는 Kafka 시스템의 브로커 실패를 알리는 데 사용됩니다. 브로커의 존재 또는 실패와 관련하여 Zookeeper가받은 알림에 따라 생산자와 소비자는 결정을 내리고 다른 브로커와 작업을 조정하기 시작합니다. |
삼 | Producers 생산자는 데이터를 브로커에게 푸시합니다. 새 브로커가 시작되면 모든 생산자가이를 검색하고 자동으로 해당 새 브로커에 메시지를 보냅니다. Kafka 생산자는 브로커의 승인을 기다리지 않고 브로커가 처리 할 수있는 한 빨리 메시지를 보냅니다. |
4 | Consumers Kafka 브로커는 상태 비 저장이므로 소비자는 파티션 오프셋을 사용하여 소비 된 메시지 수를 유지해야합니다. 소비자가 특정 메시지 오프셋을 확인하면 소비자가 이전의 모든 메시지를 사용했음을 의미합니다. 소비자는 사용할 준비가 된 바이트 버퍼를 갖도록 브로커에 비동기 풀 요청을 발행합니다. 소비자는 단순히 오프셋 값을 제공하여 파티션의 어느 지점 으로든 되감거나 건너 뛸 수 있습니다. 소비자 오프셋 값은 ZooKeeper에서 알립니다. |
지금까지 Kafka의 핵심 개념에 대해 논의했습니다. 이제 Kafka의 워크 플로우에 대해 조명 해 보겠습니다.
Kafka는 단순히 하나 이상의 파티션으로 분할 된 주제 모음입니다. Kafka 파티션은 선형 순서로 정렬 된 메시지 시퀀스로, 각 메시지는 인덱스 (오프셋이라고 함)로 식별됩니다. Kafka 클러스터의 모든 데이터는 분리 된 파티션 결합입니다. 들어오는 메시지는 파티션 끝에 기록되고 소비자는 메시지를 순차적으로 읽습니다. 메시지를 서로 다른 브로커에 복제하여 내구성을 제공합니다.
Kafka는 빠르고 안정적이며 지속적인 내결함성 및 제로 다운 타임 방식으로 pub-sub 및 큐 기반 메시징 시스템을 모두 제공합니다. 두 경우 모두 생산자는 단순히 주제에 메시지를 보내고 소비자는 필요에 따라 한 가지 유형의 메시징 시스템을 선택할 수 있습니다. 소비자가 자신이 선택한 메시징 시스템을 선택하는 방법을 이해하려면 다음 섹션의 단계를 따르십시오.
Pub-Sub 메시징의 워크 플로
다음은 Pub-Sub 메시징의 단계별 워크 플로입니다.
생산자는 정기적으로 주제에 메시지를 보냅니다.
Kafka 브로커는 특정 주제에 대해 구성된 파티션에 모든 메시지를 저장합니다. 메시지가 파티션간에 동일하게 공유되도록합니다. 생산자가 두 개의 메시지를 보내고 두 개의 파티션이있는 경우 Kafka는 첫 번째 파티션에 하나의 메시지를 저장하고 두 번째 파티션에 두 번째 메시지를 저장합니다.
소비자는 특정 주제를 구독합니다.
소비자가 토픽을 구독하면 Kafka는 토픽의 현재 오프셋을 소비자에게 제공하고 오프셋을 Zookeeper 앙상블에 저장합니다.
소비자는 새 메시지에 대해 일정한 간격 (예 : 100Ms)으로 Kafka를 요청합니다.
Kafka는 생산자로부터 메시지를 받으면 이러한 메시지를 소비자에게 전달합니다.
소비자는 메시지를 수신하고 처리합니다.
메시지가 처리되면 소비자는 Kafka 브로커에 승인을 보냅니다.
Kafka가 승인을 받으면 오프셋을 새 값으로 변경하고 Zookeeper에서 업데이트합니다. 오프셋이 Zookeeper에서 유지되기 때문에 소비자는 서버 격분 중에도 다음 메시지를 올바르게 읽을 수 있습니다.
위의 흐름은 소비자가 요청을 중지 할 때까지 반복됩니다.
소비자는 언제든지 원하는 주제의 오프셋으로 되감거나 건너 뛰고 모든 후속 메시지를 읽을 수 있습니다.
대기열 메시징 / 소비자 그룹의 워크 플로
단일 소비자 대신 대기열 메시징 시스템에서 동일한 그룹 ID를
가진 소비자 그룹
이 주제를 구독합니다. 간단히 말해서, 동일한 그룹 ID
로 주제를 구독하는 소비자 는 단일 그룹으로 간주되고 메시지가 그들간에 공유됩니다. 이 시스템의 실제 작업 흐름을 확인하겠습니다.
생산자는 일정한 간격으로 주제에 메시지를 보냅니다.
Kafka는 이전 시나리오와 유사한 특정 주제에 대해 구성된 파티션에 모든 메시지를 저장합니다.
단일 소비자가 특정 주제를 구독 하고
그룹 ID
가Group-1 인
Topic-01
이라고 가정 합니다.Kafka는 새 소비자 가
Group-1
과 동일한그룹 ID
로 동일한 주제 인Topic-01
을 구독 할 때까지 Pub-Sub 메시징과 동일한 방식으로 소비자와 상호 작용합니다 .새로운 소비자가 도착하면 Kafka는 작업을 공유 모드로 전환하고 두 소비자간에 데이터를 공유합니다. 이 공유는 소비자 수가 해당 특정 주제에 대해 구성된 파티션 수에 도달 할 때까지 계속됩니다.
소비자 수가 파티션 수를 초과하면 기존 소비자 중 하나가 구독을 취소 할 때까지 새 소비자는 더 이상 메시지를받지 않습니다. 이 시나리오는 Kafka의 각 소비자에게 최소 하나의 파티션이 할당되고 모든 파티션이 기존 소비자에게 할당되면 새 소비자가 기다려야하기 때문에 발생합니다.
이 기능을
소비자 그룹
이라고도 합니다. 같은 방식으로 Kafka는 매우 간단하고 효율적인 방식으로 두 시스템의 장점을 모두 제공합니다.
ZooKeeper의 역할
Apache Kafka의 중요한 종속성은 분산 구성 및 동기화 서비스 인 Apache Zookeeper입니다. Zookeeper는 Kafka 브로커와 소비자 간의 조정 인터페이스 역할을합니다. Kafka 서버는 Zookeeper 클러스터를 통해 정보를 공유합니다. Kafka는 주제, 브로커, 소비자 오프셋 (대기열 판독기) 등에 대한 정보와 같은 기본 메타 데이터를 Zookeeper에 저장합니다.
모든 중요한 정보가 Zookeeper에 저장되고 일반적으로이 데이터를 앙상블 전체에 복제하므로 Kafka 브로커 / Zookeeper의 실패는 Kafka 클러스터의 상태에 영향을주지 않습니다. Kafka는 Zookeeper가 다시 시작되면 상태를 복원합니다. 이렇게하면 Kafka의 다운 타임이 없습니다. Kafka 브로커 간의 리더 선택은 리더 실패시 Zookeeper를 사용하여 수행됩니다.
더 사육사의 내용을 참조하십시오 사육사
다음 장에서 시스템에 Java, ZooKeeper 및 Kafka를 설치하는 방법에 대해 계속 설명하겠습니다.
다음은 시스템에 Java를 설치하는 단계입니다.
1 단계-Java 설치 확인
지금 당장 시스템에 Java를 이미 설치했으면하므로 다음 명령을 사용하여 확인하십시오.
$ java -version
Java가 시스템에 성공적으로 설치되면 설치된 Java의 버전을 볼 수 있습니다.
1.1 단계-JDK 다운로드
Java가 다운로드되지 않은 경우 다음 링크를 방문하여 최신 버전의 JDK를 다운로드하고 최신 버전을 다운로드하십시오.
http://www.oracle.com/technetwork/java/javase/downloads/index.html이제 최신 버전은 JDK 8u 60이고 파일은 "jdk-8u60-linux-x64.tar.gz"입니다. 컴퓨터에 파일을 다운로드하십시오.
1.2 단계-파일 추출
일반적으로 다운로드중인 파일은 다운로드 폴더에 저장되며, 다음 명령을 사용하여 tar 설정을 확인하고 압축을 풉니 다.
$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz
1.3 단계-Opt 디렉터리로 이동
모든 사용자가 Java를 사용할 수 있도록하려면 추출 된 Java 컨텐츠를 usr / local / java
/ 폴더로 이동하십시오.
$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/
1.4 단계-경로 설정
경로 및 JAVA_HOME 변수를 설정하려면 ~ / .bashrc 파일에 다음 명령을 추가하십시오.
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
이제 모든 변경 사항을 현재 실행중인 시스템에 적용하십시오.
$ source ~/.bashrc
1.5 단계-Java 대안
다음 명령을 사용하여 Java Alternatives를 변경하십시오.
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
Step 1.6 − 이제 1 단계에서 설명한 확인 명령 (java -version)을 사용하여 Java를 확인합니다.
2 단계-ZooKeeper 프레임 워크 설치
2.1 단계-ZooKeeper 다운로드
컴퓨터에 ZooKeeper 프레임 워크를 설치하려면 다음 링크를 방문하여 최신 버전의 ZooKeeper를 다운로드하십시오.
http://zookeeper.apache.org/releases.html현재 ZooKeeper의 최신 버전은 3.4.6 (ZooKeeper-3.4.6.tar.gz)입니다.
2.2 단계-tar 파일 추출
다음 명령을 사용하여 tar 파일을 추출하십시오.
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data
2.3 단계-구성 파일 생성
vi "conf / zoo.cfg"명령과 시작점으로 설정할 다음 모든 매개 변수를 사용하여 conf / zoo.cfg
라는 구성 파일을 엽니 다 .
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
구성 파일이 성공적으로 저장되고 터미널로 다시 돌아 가면 zookeeper 서버를 시작할 수 있습니다.
2.4 단계-ZooKeeper 서버 시작
$ bin/zkServer.sh start
이 명령을 실행하면 아래와 같은 응답을 받게됩니다.
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED
2.5 단계-CLI 시작
$ bin/zkCli.sh
위의 명령어를 입력하면 사육사 서버에 연결되고 아래와 같은 응답을 받게됩니다.
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
2.6 단계-Zookeeper Server 중지
서버를 연결하고 모든 작업을 수행 한 후 다음 명령을 사용하여 사육사 서버를 중지 할 수 있습니다.
$ bin/zkServer.sh stop
이제 컴퓨터에 Java 및 ZooKeeper를 성공적으로 설치했습니다. Apache Kafka를 설치하는 단계를 살펴 보겠습니다.
3 단계-Apache Kafka 설치
다음 단계를 계속하여 컴퓨터에 Kafka를 설치하겠습니다.
3.1 단계-Kafka 다운로드
컴퓨터에 Kafka를 설치하려면 아래 링크를 클릭하십시오.
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz이제 최신 버전입니다. kafka_2.11_0.9.0.0.tgz 컴퓨터에 다운로드됩니다.
3.2 단계-tar 파일 추출
다음 명령을 사용하여 tar 파일을 추출하십시오-
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0
이제 컴퓨터에 최신 버전의 Kafka를 다운로드했습니다.
3.3 단계-서버 시작
다음 명령을 제공하여 서버를 시작할 수 있습니다-
$ bin/kafka-server-start.sh config/server.properties
서버가 시작되면 화면에 다음과 같은 응답이 표시됩니다.
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….
4 단계-서버 중지
모든 작업을 수행 한 후 다음 명령을 사용하여 서버를 중지 할 수 있습니다.
$ bin/kafka-server-stop.sh config/server.properties
Kafka 설치에 대해 이미 논의 했으므로 다음 장에서 Kafka에서 기본 작업을 수행하는 방법을 배울 수 있습니다.
먼저 단일 노드-단일 브로커
구성 구현 을 시작한 다음 설정을 단일 노드-다중 브로커 구성으로 마이그레이션합니다.
지금 쯤이면 컴퓨터에 Java, ZooKeeper 및 Kafka를 설치했을 것입니다. Kafka 클러스터 설정으로 이동하기 전에 Kafka 클러스터가 ZooKeeper를 사용하므로 먼저 ZooKeeper를 시작해야합니다.
ZooKeeper 시작
새 터미널을 열고 다음 명령을 입력하십시오-
bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka Broker를 시작하려면 다음 명령을 입력하십시오.
bin/kafka-server-start.sh config/server.properties
Kafka Broker를 시작한 후 ZooKeeper 터미널에 jps
명령을 입력 하면 다음과 같은 응답이 표시됩니다.
821 QuorumPeerMain
928 Kafka
931 Jps
이제 QuorumPeerMain이 ZooKeeper 데몬이고 다른 하나가 Kafka 데몬 인 터미널에서 실행중인 두 데몬을 볼 수 있습니다.
단일 노드-단일 브로커 구성
이 구성에는 단일 ZooKeeper 및 브로커 ID 인스턴스가 있습니다. 다음은 그것을 구성하는 단계입니다-
Creating a Kafka Topic− Kafka는 kafka-topics.sh
라는 명령 줄 유틸리티를 제공 하여 서버에 주제를 생성합니다. 새 터미널을 열고 아래 예제를 입력하십시오.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
단일 파티션과 하나의 복제 요소가있는 Hello-Kafka
라는 주제를 방금 만들었습니다 . 위에서 생성 된 출력은 다음 출력과 유사합니다.
Output− Hello-Kafka
주제 생성
토픽이 생성되면 Kafka 브로커 터미널 창에서 알림을받을 수 있으며 config / server.properties 파일의 "/ tmp / kafka-logs /"에 지정된 생성 된 토픽에 대한 로그를 얻을 수 있습니다.
주제 목록
Kafka 서버에서 주제 목록을 얻으려면 다음 명령을 사용할 수 있습니다.
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
주제를 만들었으므로 Hello-Kafka
만 나열 됩니다. 둘 이상의 주제를 작성하면 출력에 주제 이름이 표시된다고 가정하십시오.
생산자를 시작하여 메시지 보내기
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
위의 구문에서 생산자 명령 줄 클라이언트에는 두 가지 주요 매개 변수가 필요합니다.
Broker-list− 메시지를 보내려는 브로커 목록. 이 경우에는 브로커가 하나만 있습니다. Config / server.properties 파일에는 브로커가 포트 9092에서 수신 대기한다는 것을 알고 있으므로 브로커 포트 ID가 포함되어 있으므로 직접 지정할 수 있습니다.
주제 이름-다음은 주제 이름의 예입니다.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
생산자는 stdin의 입력을 기다리고 Kafka 클러스터에 게시합니다. 기본적으로 모든 새 줄은 새 메시지로 게시되고 기본 생산자 속성은 config / producer.properties
파일에 지정 됩니다. 이제 아래와 같이 터미널에 몇 줄의 메시지를 입력 할 수 있습니다.
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
소비자가 메시지를 수신하도록 시작
생산자와 마찬가지로 기본 소비자 속성은 config / consumer.proper-ties
파일에 지정 됩니다. 새 터미널을 열고 메시지 사용을 위해 아래 구문을 입력하십시오.
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
마지막으로 생산자의 터미널에서 메시지를 입력하고 소비자의 터미널에 나타나는 메시지를 볼 수 있습니다. 현재로서는 단일 브로커가있는 단일 노드 클러스터에 대해 매우 잘 이해하고 있습니다. 이제 다중 브로커 구성으로 이동하겠습니다.
단일 노드-다중 브로커 구성
다중 브로커 클러스터 설정으로 이동하기 전에 먼저 ZooKeeper 서버를 시작하십시오.
Create Multiple Kafka Brokers− con-fig / server.properties에 이미 하나의 Kafka 브로커 인스턴스가 있습니다. 이제 여러 브로커 인스턴스가 필요하므로 기존 server.prop-erties 파일을 두 개의 새 구성 파일로 복사하고 이름을 server-one.properties 및 server-two.prop-erties로 바꿉니다. 그런 다음 두 새 파일을 모두 편집하고 다음 변경 사항을 할당하십시오.
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers− 세 개의 서버에서 모든 변경을 수행 한 후 세 개의 새 터미널을 열어 각 브로커를 하나씩 시작합니다.
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
이제 우리는 머신에서 3 개의 다른 브로커를 실행하고 있습니다. 직접 입력하여 모든 데몬을 확인하십시오.jps ZooKeeper 터미널에서 응답을 볼 수 있습니다.
주제 만들기
세 개의 다른 브로커가 실행 중이므로이 주제에 대해 복제 요소 값을 3으로 지정하겠습니다. 브로커가 두 개인 경우 할당 된 복제본 값은 2가됩니다.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
설명
명령은 아래와 같이 현재 생성 된 주제에 수신되는 브로커를 확인하는 데 사용됩니다 -
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
위의 출력에서 첫 번째 줄은 이미 선택한 주제 이름, 파티션 수 및 복제 요소를 보여주는 모든 파티션의 요약을 제공한다는 결론을 내릴 수 있습니다. 두 번째 줄에서 각 노드는 무작위로 선택된 파티션 부분의 리더가됩니다.
우리의 경우 첫 번째 브로커 (broker.id 0)가 리더임을 알 수 있습니다. 그런 다음 Replicas : 0,2,1은 모든 브로커가 주제를 복제 함을 의미합니다. 마지막으로 Isr
은 동기화 된
복제본 의 집합입니다 . 글쎄, 이것은 현재 살아 있고 리더가 잡은 복제본의 하위 집합입니다.
생산자를 시작하여 메시지 보내기
이 절차는 단일 브로커 설정과 동일합니다.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
소비자가 메시지를 수신하도록 시작
이 절차는 단일 브로커 설정에 표시된 것과 동일합니다.
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
기본 주제 작업
이 장에서는 다양한 기본 주제 작업에 대해 설명합니다.
주제 수정
Kafka Cluster에서 주제를 만드는 방법을 이미 이해했듯이. 이제 다음 명령을 사용하여 생성 된 주제를 수정하겠습니다.
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
주제 삭제
주제를 삭제하려면 다음 구문을 사용할 수 있습니다.
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −다음과 같은 경우에는 영향이 없습니다. delete.topic.enable true로 설정되지 않음
Java 클라이언트를 사용하여 메시지를 게시하고 소비하기위한 응용 프로그램을 만들어 보겠습니다. Kafka 생산자 클라이언트는 다음 API로 구성됩니다.
KafkaProducer API
이 섹션에서 가장 중요한 Kafka 생산자 API 세트를 이해하겠습니다. KafkaProducer API의 중심 부분은 KafkaProducer
클래스입니다. KafkaProducer 클래스는 다음 메소드를 사용하여 생성자에서 Kafka 브로커를 연결하는 옵션을 제공합니다.
KafkaProducer 클래스는 주제에 비동기 적으로 메시지를 보내는 send 메소드를 제공합니다. send ()의 서명은 다음과 같습니다.
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
ProducerRecord − 생산자는 전송 대기중인 레코드 버퍼를 관리합니다.
Callback − 서버가 레코드를 승인했을 때 실행할 사용자 제공 콜백 (null은 콜백이 없음을 나타냄).
KafkaProducer 클래스는 이전에 보낸 모든 메시지가 실제로 완료되었는지 확인하는 flush 메서드를 제공합니다. flush 메소드의 구문은 다음과 같습니다.
public void flush()
KafkaProducer 클래스는 주어진 주제에 대한 파티션 메타 데이터를 가져 오는 데 도움이되는 partitionFor 메서드를 제공합니다. 사용자 지정 분할에 사용할 수 있습니다. 이 방법의 서명은 다음과 같습니다.
public Map metrics()
생산자가 유지 관리하는 내부 메트릭 맵을 반환합니다.
public void close () − KafkaProducer 클래스는 이전에 보낸 모든 요청이 완료 될 때까지 close 메서드 블록을 제공합니다.
생산자 API
Producer API의 중심 부분은 Producer
클래스입니다. Producer 클래스는 다음과 같은 방법으로 생성자에서 Kafka 브로커를 연결하는 옵션을 제공합니다.
생산자 클래스
생산자 클래스는 send 다음 서명을 사용하여 단일 또는 여러 주제에 대한 메시지.
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
생산자에는 두 가지 유형이 있습니다. Sync 과 Async.
동일한 API 구성이 동기화
생산자 에도 적용됩니다 . 차이점은 동기화 생산자가 메시지를 직접 전송하지만 백그라운드에서 메시지를 전송한다는 것입니다. 더 높은 처리량을 원할 때 비동기 생산자가 선호됩니다. 0.8과 같은 이전 릴리스에서 비동기 생산자는 오류 핸들러를 등록하기위한 send ()에 대한 콜백이 없습니다. 이것은 0.9의 현재 릴리스에서만 사용할 수 있습니다.
공개 무효 close ()
Producer 클래스는 close 모든 Kafka 브로커에 대한 생산자 풀 연결을 닫는 방법.
구성 설정
Producer API의 주요 구성 설정은 이해를 돕기 위해 다음 표에 나열되어 있습니다.
S. 아니 | 구성 설정 및 설명 |
---|---|
1 | client.id 생산자 애플리케이션 식별 |
2 | producer.type 동기화 또는 비동기 |
삼 | acks acks 구성은 생산자 요청이 완료된 것으로 간주되는 기준을 제어합니다. |
4 | retries 생산자 요청이 실패하면 특정 값으로 자동 재 시도합니다. |
5 | bootstrap.servers 브로커의 부트 스트랩 목록입니다. |
6 | linger.ms 요청 수를 줄이려면 linger.ms를 어떤 값보다 큰 값으로 설정할 수 있습니다. |
7 | key.serializer 직렬 변환기 인터페이스의 키입니다. |
8 | value.serializer serializer 인터페이스의 값입니다. |
9 | batch.size 버퍼 크기. |
10 | buffer.memory 버퍼링을 위해 생산자가 사용할 수있는 총 메모리 양을 제어합니다. |
ProducerRecord API
ProducerRecord는 다음 서명을 사용하여 파티션, 키 및 값 쌍이있는 레코드를 생성하기 위해 Kafka cluster.ProducerRecord 클래스 생성자로 전송되는 키 / 값 쌍입니다.
public ProducerRecord (string topic, int partition, k key, v value)
Topic − 기록에 추가 될 사용자 정의 주제 이름.
Partition − 파티션 수
Key − 기록에 포함될 키.
- Value − 내용 기록
public ProducerRecord (string topic, k key, v value)
ProducerRecord 클래스 생성자는 키, 값 쌍이 있고 파티션이없는 레코드를 만드는 데 사용됩니다.
Topic − 기록을 할당 할 주제를 만듭니다.
Key − 기록을위한 키.
Value − 기록 내용.
public ProducerRecord (string topic, v value)
ProducerRecord 클래스는 파티션 및 키없이 레코드를 만듭니다.
Topic − 주제를 만듭니다.
Value − 기록 내용.
ProducerRecord 클래스 메소드는 다음 표에 나열되어 있습니다.
S. 아니 | 클래스 방법 및 설명 |
---|---|
1 | public string topic() 주제가 레코드에 추가됩니다. |
2 | public K key() 기록에 포함될 키입니다. 그러한 키가 없으면 여기에서 null이 반환됩니다. |
삼 | public V value() 내용을 기록하십시오. |
4 | partition() 레코드의 파티션 수 |
SimpleProducer 애플리케이션
응용 프로그램을 만들기 전에 먼저 ZooKeeper 및 Kafka 브로커를 시작한 다음 create topic 명령을 사용하여 Kafka 브로커에서 고유 한 주제를 생성합니다. 그 후 Sim-pleProducer.java
라는 Java 클래스를 만들고 다음 코딩을 입력하십시오.
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation − 다음 명령을 사용하여 응용 프로그램을 컴파일 할 수 있습니다.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution − 다음 명령을 사용하여 응용 프로그램을 실행할 수 있습니다.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
간단한 소비자 예
지금까지 Kafka 클러스터에 메시지를 보내는 생산자를 만들었습니다. 이제 Kafka 클러스터에서 메시지를 소비하는 소비자를 만들어 보겠습니다. KafkaConsumer API는 Kafka 클러스터에서 메시지를 사용하는 데 사용됩니다. KafkaConsumer 클래스 생성자는 아래에 정의되어 있습니다.
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs − 소비자 구성 맵을 반환합니다.
KafkaConsumer 클래스에는 아래 표에 나열된 다음과 같은 중요한 메서드가 있습니다.
S. 아니 | 방법 및 설명 |
---|---|
1 | public java.util.Set<TopicPar-tition> assignment() 소비자가 현재 할당 한 파티션 세트를 가져옵니다. |
2 | public string subscription() 동적으로 서명 된 파티션을 얻으려면 주어진 주제 목록을 구독하십시오. |
삼 | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 동적으로 서명 된 파티션을 얻으려면 주어진 주제 목록을 구독하십시오. |
4 | public void unsubscribe() 주어진 파티션 목록에서 주제를 구독 취소하십시오. |
5 | public void sub-scribe(java.util.List<java.lang.String> topics) 동적으로 서명 된 파티션을 얻으려면 주어진 주제 목록을 구독하십시오. 주어진 주제 목록이 비어 있으면 unsubscribe ()와 동일하게 처리됩니다. |
6 | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 인수 패턴은 정규식 형식의 구독 패턴을 참조하고 리스너 인수는 구독 패턴에서 알림을받습니다. |
7 | public void as-sign(java.util.List<TopicParti-tion> partitions) 고객에게 파티션 목록을 수동으로 할당합니다. |
8 | poll() 구독 / 할당 API 중 하나를 사용하여 지정된 주제 또는 파티션에 대한 데이터를 가져옵니다. 데이터를 폴링하기 전에 토픽을 구독하지 않으면 오류가 반환됩니다. |
9 | public void commitSync() Commit offsets returned on the last poll() for all the sub-scribed list of topics and partitions. The same operation is applied to commitAsyn(). |
10 | public void seek(TopicPartition partition, long offset) Fetch the current offset value that consumer will use on the next poll() method. |
11 | public void resume() Resume the paused partitions. |
12 | public void wakeup() Wakeup the consumer. |
ConsumerRecord API
The ConsumerRecord API is used to receive records from the Kafka cluster. This API consists of a topic name, partition number, from which the record is being received and an offset that points to the record in a Kafka partition. ConsumerRecord class is used to create a consumer record with specific topic name, partition count and <key, value> pairs. It has the following signature.
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic − The topic name for consumer record received from the Kafka cluster.
Partition − Partition for the topic.
Key − The key of the record, if no key exists null will be returned.
Value − Record contents.
ConsumerRecords API
ConsumerRecords API acts as a container for ConsumerRecord. This API is used to keep the list of ConsumerRecord per partition for a particular topic. Its Constructor is defined below.
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
TopicPartition − Return a map of partition for a particular topic.
Records − Return list of ConsumerRecord.
ConsumerRecords class has the following methods defined.
S.No | Methods and Description |
---|---|
1 | public int count() The number of records for all the topics. |
2 | public Set partitions() The set of partitions with data in this record set (if no data was returned then the set is empty). |
3 | public Iterator iterator() Iterator enables you to cycle through a collection, obtaining or re-moving elements. |
4 | public List records() Get list of records for the given partition. |
구성 설정
소비자 클라이언트 API 기본 구성 설정에 대한 구성 설정은 다음과 같습니다.
S. 아니 | 설정 및 설명 |
---|---|
1 | bootstrap.servers 브로커 목록을 부트 스트랩합니다. |
2 | group.id 개별 소비자를 그룹에 할당합니다. |
삼 | enable.auto.commit 값이 true이면 오프셋에 대해 자동 커밋을 활성화하고 그렇지 않으면 커밋되지 않습니다. |
4 | auto.commit.interval.ms 업데이트 된 소비 오프셋이 ZooKeeper에 기록되는 빈도를 반환합니다. |
5 | session.timeout.ms Kafka가 메시지를 포기하고 계속 사용하기 전에 ZooKeeper가 요청 (읽기 또는 쓰기)에 응답 할 때까지 기다리는 시간 (밀리 초)을 나타냅니다. |
SimpleConsumer 애플리케이션
생산자 신청 단계는 여기에서 동일하게 유지됩니다. 먼저 ZooKeeper 및 Kafka 브로커를 시작하십시오. 그런 다음 생성 SimpleConsumer의
이름의 자바 클래스와 응용 프로그램 SimpleCon-sumer.java을
하고 다음 코드를 입력합니다.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation − 다음 명령을 사용하여 응용 프로그램을 컴파일 할 수 있습니다.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution − 다음 명령을 사용하여 응용 프로그램을 실행할 수 있습니다.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input− 생산자 CLI를 열고 주제에 몇 가지 메시지를 보냅니다. 간단한 입력을 'Hello Consumer'로 입력 할 수 있습니다.
Output − 다음은 출력됩니다.
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
소비자 그룹은 Kafka 주제의 다중 스레드 또는 다중 시스템 소비입니다.
소비자 그룹
소비자는 동일한
group.id
를 사용하여 그룹에 가입 할 수 있습니다.
그룹의 최대 병렬 처리는 그룹의 소비자 수 ← 파티션 수입니다.
Kafka는 그룹의 소비자에게 주제의 파티션을 할당하므로 각 파티션은 그룹의 정확히 한 소비자가 사용합니다.
Kafka는 그룹의 단일 소비자 만 메시지를 읽도록 보장합니다.
소비자는 로그에 저장된 순서대로 메시지를 볼 수 있습니다.
소비자의 재조정
더 많은 프로세스 / 스레드를 추가하면 Kafka가 재조정됩니다. 소비자 또는 브로커가 ZooKeeper로 하트 비트를 보내지 못하는 경우 Kafka 클러스터를 통해 재구성 할 수 있습니다. 이 재조정 동안 Kafka는 사용 가능한 스레드에 사용 가능한 파티션을 할당하여 파티션을 다른 프로세스로 이동할 수 있습니다.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
편집
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
실행
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
여기 에서는 두 명의 소비자가있는 my-group
으로 샘플 그룹 이름을 만들었습니다 . 마찬가지로 그룹과 그룹의 소비자 수를 만들 수 있습니다.
입력
생산자 CLI를 열고 다음과 같은 메시지를 보냅니다.
Test consumer group 01
Test consumer group 02
첫 번째 프로세스의 출력
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
두 번째 프로세스의 출력
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
이제 Java 클라이언트 데모를 사용하여 SimpleConsumer 및 ConsumeGroup을 이해했을 것입니다. 이제 Java 클라이언트를 사용하여 메시지를 보내고받는 방법에 대한 아이디어를 얻었습니다. 다음 장에서 빅 데이터 기술과 Kafka 통합을 계속하겠습니다.
이 장에서는 Kafka를 Apache Storm과 통합하는 방법을 배웁니다.
Storm 정보
Storm은 원래 Nathan Marz와 BackType 팀이 만들었습니다. 단기간에 Apache Storm은 방대한 양의 데이터를 처리 할 수있는 분산 실시간 처리 시스템의 표준이되었습니다. Storm은 매우 빠르며 벤치 마크에 따르면 노드 당 초당 처리되는 튜플이 백만 개가 넘습니다. Apache Storm은 지속적으로 실행되어 구성된 소스 (Spouts)의 데이터를 사용하고 데이터를 처리 파이프 라인 (Bolts)으로 전달합니다. 결합 된 주둥이와 볼트는 토폴로지를 만듭니다.
Storm과 통합
Kafka와 Storm은 자연스럽게 서로를 보완하며 강력한 협력을 통해 빠르게 움직이는 빅 데이터에 대한 실시간 스트리밍 분석을 가능하게합니다. Kafka와 Storm 통합은 개발자가 Storm 토폴로지에서 데이터 스트림을 더 쉽게 수집하고 게시 할 수 있도록하기위한 것입니다.
개념적 흐름
주둥이는 개울의 근원입니다. 예를 들어, 스파우트는 Kafka 주제에서 튜플을 읽고 스트림으로 내보낼 수 있습니다. 볼트는 입력 스트림을 소비하고 처리하고 새 스트림을 방출 할 수 있습니다. Bolts는 함수 실행, 튜플 필터링, 스트리밍 집계, 스트리밍 조인, 데이터베이스와의 대화 등 모든 작업을 수행 할 수 있습니다. Storm 토폴로지의 각 노드는 병렬로 실행됩니다. 토폴로지는 종료 할 때까지 무기한 실행됩니다. Storm은 실패한 작업을 자동으로 재 할당합니다. 또한 Storm은 시스템이 다운되고 메시지가 삭제 되더라도 데이터 손실이 없음을 보장합니다.
Kafka-Storm 통합 API를 자세히 살펴 보겠습니다. Kafka를 Storm과 통합하는 세 가지 주요 클래스가 있습니다. 그들은 다음과 같습니다-
BrokerHosts-ZkHosts 및 StaticHosts
BrokerHosts는 인터페이스이며 ZkHosts 및 StaticHosts는 두 가지 주요 구현입니다. ZkHosts는 ZooKeeper에서 세부 정보를 유지하여 Kafka 브로커를 동적으로 추적하는 데 사용되는 반면 StaticHosts는 Kafka 브로커 및 세부 정보를 수동 / 정적으로 설정하는 데 사용됩니다. ZkHosts는 Kafka 브로커에 액세스하는 간단하고 빠른 방법입니다.
ZkHosts의 서명은 다음과 같습니다.
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
brokerZkStr은 ZooKeeper 호스트이고 brokerZkPath는 Kafka 브로커 세부 정보를 유지하기위한 ZooKeeper 경로입니다.
KafkaConfig API
이 API는 Kafka 클러스터에 대한 구성 설정을 정의하는 데 사용됩니다. Kafka Con-fig의 서명은 다음과 같이 정의됩니다.
public KafkaConfig(BrokerHosts hosts, string topic)
Hosts − BrokerHost는 ZkHosts / StaticHosts가 될 수 있습니다.
Topic − 주제 이름.
SpoutConfig API
Spoutconfig는 추가 ZooKeeper 정보를 지원하는 KafkaConfig의 확장입니다.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
Hosts − BrokerHosts는 BrokerHosts 인터페이스의 모든 구현이 될 수 있습니다.
Topic − 주제 이름.
zkRoot − ZooKeeper 루트 경로.
id −주둥이는 Zookeeper에서 소비 한 오프셋 상태를 저장합니다. ID는 스파우트를 고유하게 식별해야합니다.
SchemeAsMultiScheme
SchemeAsMultiScheme은 Kafka에서 소비 된 ByteBuffer가 스톰 튜플로 변환되는 방식을 지시하는 인터페이스입니다. MultiScheme에서 파생되었으며 Scheme 클래스의 구현을 허용합니다. Scheme 클래스의 많은 구현이 있으며 이러한 구현 중 하나는 바이트를 간단한 문자열로 구문 분석하는 StringScheme입니다. 또한 출력 필드의 이름을 제어합니다. 서명은 다음과 같이 정의됩니다.
public SchemeAsMultiScheme(Scheme scheme)
Scheme − kafka에서 소비 된 바이트 버퍼.
KafkaSpout API
KafkaSpout은 Storm과 통합되는 스파우트 구현입니다. kafka 토픽에서 메시지를 가져와 스톰 생태계에 튜플로 내 보냅니다. KafkaSpout는 SpoutConfig에서 구성 세부 정보를 가져옵니다.
다음은 간단한 Kafka 스파우트를 만드는 샘플 코드입니다.
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
볼트 생성
Bolt는 튜플을 입력으로 받고, 튜플을 처리하고, 새 튜플을 출력으로 생성하는 구성 요소입니다. Bolts는 IRichBolt 인터페이스를 구현합니다. 이 프로그램에서는 두 개의 볼트 클래스 WordSplitter-Bolt 및 WordCounterBolt가 작업을 수행하는 데 사용됩니다.
IRichBolt 인터페이스에는 다음과 같은 방법이 있습니다.
Prepare− 볼트에 실행할 환경을 제공합니다. 실행자는이 메서드를 실행하여 스파우트를 초기화합니다.
Execute − 단일 튜플 입력을 처리합니다.
Cleanup − 볼트가 셧다운 될 때 호출됩니다.
declareOutputFields − 튜플의 출력 스키마를 선언합니다.
문장을 단어로 분할하는 로직을 구현하는 SplitBolt.java와 고유 한 단어를 분리하고 발생 횟수를 계산하는 로직을 구현하는 CountBolt.java를 생성 해 보겠습니다.
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
토폴로지에 제출
Storm 토폴로지는 기본적으로 Thrift 구조입니다. TopologyBuilder 클래스는 복잡한 토폴로지를 만드는 간단하고 쉬운 방법을 제공합니다. TopologyBuilder 클래스에는 스파우트 (setSpout)를 설정하고 볼트 (setBolt)를 설정하는 메서드가 있습니다. 마지막으로 TopologyBuilder에는 to-pology를 생성하는 createTopology가 있습니다. shuffleGrouping 및 fieldsGrouping 메서드는 주둥이와 볼트에 대한 스트림 그룹화를 설정하는 데 도움이됩니다.
Local Cluster− 개발 목적으로 LocalCluster
객체를 사용하여 로컬 클러스터를 생성 한 후 LocalCluster
클래스의 submitTopology
메소드를 사용하여 토폴로지를 제출할 수 있습니다.
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
컴파일을 이동하기 전에 Kakfa-Storm 통합에는 큐레이터 ZooKeeper 클라이언트 자바 라이브러리가 필요합니다. 큐레이터 버전 2.9.1은 Apache Storm 버전 0.9.5 (이 자습서에서 사용)를 지원합니다. 아래 지정된 jar 파일을 다운로드하고 Java 클래스 경로에 배치하십시오.
- curator-client-2.9.1.jar
- curator-framework-2.9.1.jar
종속성 파일을 포함시킨 후 다음 명령을 사용하여 프로그램을 컴파일하십시오.
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
실행
Kafka Producer CLI (이전 장에서 설명 됨)를 시작하고 my-first-topic
이라는 새 주제를 만들고 아래와 같이 몇 가지 샘플 메시지를 제공합니다.
hello
kafka
storm
spark
test message
another test message
이제 다음 명령을 사용하여 응용 프로그램을 실행하십시오-
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
이 응용 프로그램의 샘플 출력은 다음과 같습니다.
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2
이 장에서는 Apache Kafka를 Spark Streaming API와 통합하는 방법에 대해 설명합니다.
Spark 정보
Spark Streaming API는 실시간 데이터 스트림의 확장 가능하고 처리량이 높고 내결함성이있는 스트림 처리를 지원합니다. 데이터는 Kafka, Flume, Twitter 등과 같은 여러 소스에서 수집 할 수 있으며 map, reduce, join 및 window와 같은 고급 기능과 같은 복잡한 알고리즘을 사용하여 처리 할 수 있습니다. 마지막으로 처리 된 데이터는 파일 시스템, 데이터베이스 및 라이브 대시 보드로 푸시 될 수 있습니다. RDD (Resilient Distributed Dataset)는 Spark의 기본 데이터 구조입니다. 불변의 분산 된 개체 모음입니다. RDD의 각 데이터 세트는 클러스터의 다른 노드에서 계산 될 수있는 논리 파티션으로 나뉩니다.
Spark와 통합
Kafka는 Spark 스트리밍을위한 잠재적 메시징 및 통합 플랫폼입니다. Kafka는 실시간 데이터 스트림의 중앙 허브 역할을하며 Spark Streaming에서 복잡한 알고리즘을 사용하여 처리됩니다. 데이터가 처리되면 Spark Streaming은 결과를 또 다른 Kafka 주제에 게시하거나 HDFS, 데이터베이스 또는 대시 보드에 저장할 수 있습니다. 다음 다이어그램은 개념적 흐름을 보여줍니다.
이제 Kafka-Spark API를 자세히 살펴 보겠습니다.
SparkConf API
Spark 애플리케이션의 구성을 나타냅니다. 다양한 Spark 매개 변수를 키-값 쌍으로 설정하는 데 사용됩니다.
SparkConf
클래스에는 다음과 같은 메서드가 있습니다.
set(string key, string value) − 구성 변수를 설정합니다.
remove(string key) − 구성에서 키를 제거합니다.
setAppName(string name) − 애플리케이션의 애플리케이션 이름을 설정합니다.
get(string key) − 열쇠 받기
StreamingContext API
이것이 Spark 기능의 주요 진입 점입니다. SparkContext는 Spark 클러스터에 대한 연결을 나타내며 클러스터에서 RDD, 누산기 및 브로드 캐스트 변수를 만드는 데 사용할 수 있습니다. 서명은 아래와 같이 정의됩니다.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master − 연결할 클러스터 URL (예 : mesos : // host : port, spark : // host : port, local [4]).
appName − 클러스터 웹 UI에 표시 할 작업 이름
batchDuration − 스트리밍 데이터가 배치로 분할되는 시간 간격
public StreamingContext(SparkConf conf, Duration batchDuration)
새 SparkContext에 필요한 구성을 제공하여 StreamingContext를 만듭니다.
conf − Spark 매개 변수
batchDuration − 스트리밍 데이터가 배치로 분할되는 시간 간격
KafkaUtils API
KafkaUtils API는 Kafka 클러스터를 Spark 스트리밍에 연결하는 데 사용됩니다. 이 API는 아래와 같이 정의 된 signifi-cant 메소드 createStream
시그니처를 가지고 있습니다.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
위에 표시된 방법은 Kafka Brokers에서 메시지를 가져 오는 입력 스트림을 만드는 데 사용됩니다.
ssc − StreamingContext 객체.
zkQuorum − 사육사 쿼럼.
groupId −이 소비자의 그룹 ID.
topics − 소비 할 주제지도를 반환합니다.
storageLevel − 수신 된 객체를 저장하는 데 사용할 저장 수준.
KafkaUtils API에는 수신자를 사용하지 않고 Kafka Brokers에서 직접 메시지를 가져 오는 입력 스트림을 만드는 데 사용되는 또 다른 메서드 createDirectStream이 있습니다. 이 스트림은 Kafka의 각 메시지가 변환에 정확히 한 번 포함되도록 보장 할 수 있습니다.
샘플 애플리케이션은 Scala에서 수행됩니다. 애플리케이션을 컴파일하려면 sbt
, scala 빌드 도구 (maven과 유사) 를 다운로드하여 설치하십시오 . 주요 애플리케이션 코드는 다음과 같습니다.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
빌드 스크립트
spark-kafka 통합은 spark, spark 스트리밍 및 spark Kafka 통합 jar에 따라 다릅니다. 새 파일 build.sbt를
만들고 응용 프로그램 세부 정보와 해당 종속성을 지정합니다. SBT는
컴파일하고 응용 프로그램을 포장하는 동안 필요한 항아리를 다운로드합니다.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
편집 / 패키징
다음 명령을 실행하여 애플리케이션의 jar 파일을 컴파일하고 패키징합니다. 애플리케이션을 실행하려면 jar 파일을 Spark 콘솔에 제출해야합니다.
sbt package
Spark에 제출
Kafka Producer CLI (이전 장에서 설명)를 시작하고 my-first-topic
이라는 새 주제를 만들고 아래와 같이 몇 가지 샘플 메시지를 제공합니다.
Another spark test message
다음 명령을 실행하여 애플리케이션을 Spark 콘솔에 제출합니다.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
이 애플리케이션의 샘플 출력은 다음과 같습니다.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..
실시간 애플리케이션을 분석하여 최신 트위터 피드와 해시 태그를 가져 오겠습니다. 이전에 Storm 및 Spark와 Kafka의 통합을 확인했습니다. 두 시나리오 모두에서 Kafka 생태계에 메시지를 보내기 위해 Kafka Producer (cli 사용)를 만들었습니다. 그런 다음 스톰 및 스파크 통합은 Kafka 소비자를 사용하여 메시지를 읽고이를 각각 스톰 및 스파크 생태계에 주입합니다. 그래서 실제로 우리는 Kafka Producer를 만들어야합니다.
- "Twitter Streaming API"를 사용하여 트위터 피드를 읽고,
- 피드 처리,
- HashTag를 추출하고
- Kafka로 보내십시오.
한 번 해시 태그는
카프카, 폭풍에 의해 수신 / 스파크의 통합은 INFOR-mation의를 받고 스톰 / 스파크 생태계에 보낼 수 있습니다.
Twitter 스트리밍 API
"Twitter Streaming API"는 모든 프로그래밍 언어로 액세스 할 수 있습니다. "twitter4j"는 "Twitter Streaming API"에 쉽게 액세스 할 수있는 Java 기반 모듈을 제공하는 오픈 소스 비공식 Java 라이브러리입니다. "twitter4j"는 트윗에 액세스 할 수있는 리스너 기반 프레임 워크를 제공합니다. "Twitter Streaming API"에 액세스하려면 Twitter 개발자 계정에 로그인해야하며 다음을 받아야합니다.OAuth 인증 세부 사항.
- Customerkey
- CustomerSecret
- AccessToken
- AccessTookenSecret
개발자 계정이 생성되면 "twitter4j"jar 파일을 다운로드하여 Java 클래스 경로에 배치합니다.
완전한 Twitter Kafka 생산자 코딩 (KafkaTwitterProducer.java)은 다음과 같습니다.
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
if(args.length < 5){
System.out.println(
"Usage: KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
String consumerKey = args[0].toString();
String consumerSecret = args[1].toString();
String accessToken = args[2].toString();
String accessTokenSecret = args[3].toString();
String topicName = args[4].toString();
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
// System.out.println("@" + status.getUser().getScreenName()
+ " - " + status.getText());
// System.out.println("@" + status.getUser().getScreen-Name());
/*for(URLEntity urle : status.getURLEntities()) {
System.out.println(urle.getDisplayURL());
}*/
/*for(HashtagEntity hashtage : status.getHashtagEntities()) {
System.out.println(hashtage.getText());
}*/
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
// System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// System.out.println("Got track limitation notice:" +
num-berOfLimitedStatuses);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// System.out.println("Got scrub_geo event userId:" + userId +
"upToStatusId:" + upToStatusId);
}
@Override
public void onStallWarning(StallWarning warning) {
// System.out.println("Got stall warning:" + warning);
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery().track(keyWords);
twitterStream.filter(query);
Thread.sleep(5000);
//Add Kafka producer config settings
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
int j = 0;
while(i < 10) {
Status ret = queue.poll();
if (ret == null) {
Thread.sleep(100);
i++;
}else {
for(HashtagEntity hashtage : ret.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
producer.send(new ProducerRecord<String, String>(
top-icName, Integer.toString(j++), hashtage.getText()));
}
}
}
producer.close();
Thread.sleep(5000);
twitterStream.shutdown();
}
}
편집
다음 명령을 사용하여 응용 프로그램을 컴파일하십시오-
javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java
실행
두 개의 콘솔을 엽니 다. 하나의 콘솔에서 아래와 같이 위에서 컴파일 된 응용 프로그램을 실행합니다.
java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food
다른 창에서 이전 장에서 설명한 Spark / Storm 애플리케이션 중 하나를 실행합니다. 주목해야 할 요점은 사용 된 주제가 두 경우 모두 동일해야한다는 것입니다. 여기서는 주제 이름으로 "my-first-topic"을 사용했습니다.
산출
이 애플리케이션의 출력은 키워드와 트위터의 현재 피드에 따라 달라집니다. 아래에 샘플 출력이 지정되어 있습니다 (폭풍 통합).
. . .
food : 1
foodie : 2
burger : 1
. . .
Kafka Tool은“org.apache.kafka.tools. *. 도구는 시스템 도구와 복제 도구로 분류됩니다.
시스템 도구
시스템 도구는 실행 클래스 스크립트를 사용하여 명령 줄에서 실행할 수 있습니다. 구문은 다음과 같습니다-
bin/kafka-run-class.sh package.class - - options
시스템 도구 중 일부는 다음과 같습니다.
Kafka Migration Tool −이 도구는 브로커를 한 버전에서 다른 버전으로 마이그레이션하는 데 사용됩니다.
Mirror Maker −이 도구는 한 Kafka 클러스터를 다른 클러스터로 미러링하는 데 사용됩니다.
Consumer Offset Checker −이 도구는 지정된 주제 및 소비자 그룹 세트에 대한 소비자 그룹, 주제, 파티션, 오프셋, logSize, 소유자를 표시합니다.
복제 도구
Kafka 복제는 높은 수준의 디자인 도구입니다. 복제 도구를 추가하는 목적은 내구성과 가용성을 높이는 것입니다. 복제 도구 중 일부는 다음과 같습니다.
Create Topic Tool − 이것은 기본 파티션 수, 복제 인자로 토픽을 생성하고 Kafka의 기본 체계를 사용하여 복제 할당을 수행합니다.
List Topic Tool−이 도구는 주어진 주제 목록에 대한 정보를 나열합니다. 명령 줄에 항목이 제공되지 않으면 도구는 Zookeeper를 쿼리하여 모든 항목을 가져오고 해당 정보를 나열합니다. 도구가 표시하는 필드는 주제 이름, 파티션, 리더, 복제본, isr입니다.
Add Partition Tool− 토픽 생성, 토픽에 대한 파티션 수를 지정해야합니다. 나중에 토픽의 볼륨이 증가 할 때 토픽에 대해 더 많은 파티션이 필요할 수 있습니다. 이 도구는 특정 주제에 대해 더 많은 파티션을 추가하는 데 도움이되며 추가 된 파티션의 수동 복제본 할당도 허용합니다.
Kafka는 오늘날 최고의 산업용 애플리케이션을 많이 지원합니다. 이 장에서는 Kafka의 가장 주목할만한 응용 프로그램에 대한 간략한 개요를 제공합니다.
트위터
Twitter는 사용자 트윗을주고받을 수있는 플랫폼을 제공하는 온라인 소셜 네트워킹 서비스입니다. 등록 된 사용자는 트윗을 읽고 게시 할 수 있지만 등록되지 않은 사용자는 트윗을 읽을 수만 있습니다. Twitter는 스트림 처리 인프라의 일부로 Storm-Kafka를 사용합니다.
Apache Kafka는 LinkedIn에서 활동 스트림 데이터 및 운영 메트릭에 사용됩니다. Kafka 메시지 전송 시스템은 LinkedIn Newsfeed, LinkedIn Today와 같은 다양한 제품과 온라인 메시지 소비 및 Hadoop과 같은 오프라인 분석 시스템으로 LinkedIn을 지원합니다. Kafka의 강력한 내구성도 LinkedIn과 관련된 핵심 요소 중 하나입니다.
넷플릭스
Netflix는 미국의 다국적 주문형 인터넷 스트리밍 미디어 제공 업체입니다. Netflix는 실시간 모니터링 및 이벤트 처리를 위해 Kafka를 사용합니다.
Mozilla
Mozilla는 1998 년 Netscape 회원이 만든 무료 소프트웨어 커뮤니티입니다. Kafka는 곧 Telemetry, Test Pilot 등과 같은 프로젝트를 위해 최종 사용자의 브라우저에서 성능 및 사용 데이터를 수집하기 위해 Mozilla 현재 프로덕션 시스템의 일부를 대체 할 것입니다.
신탁
Oracle은 OSB (Oracle Service Bus)라는 엔터프라이즈 서비스 버스 제품에서 Kafka에 대한 기본 연결을 제공하므로 개발자는 OSB 기본 제공 중개 기능을 활용하여 단계적 데이터 파이프 라인을 구현할 수 있습니다.