Apache Kafka - szybki przewodnik

W Big Data wykorzystywane są ogromne ilości danych. Jeśli chodzi o dane, mamy dwa główne wyzwania: pierwsze wyzwanie polega na tym, jak zebrać duże ilości danych, a drugie to analiza zebranych danych. Aby sprostać tym wyzwaniom, potrzebujesz systemu przesyłania wiadomości.

Kafka jest przeznaczona dla rozproszonych systemów o dużej przepustowości. Kafka zwykle działa bardzo dobrze jako zamiennik bardziej tradycyjnego brokera wiadomości. W porównaniu z innymi systemami przesyłania wiadomości, Kafka ma lepszą przepustowość, wbudowane partycjonowanie, replikację i nieodłączną odporność na błędy, co sprawia, że ​​dobrze sprawdza się w aplikacjach przetwarzania wiadomości na dużą skalę.

Co to jest system przesyłania wiadomości?

System przesyłania wiadomości jest odpowiedzialny za przesyłanie danych z jednej aplikacji do drugiej, więc aplikacje mogą skupić się na danych, ale nie martwią się, jak je udostępniać. Rozproszone przesyłanie wiadomości opiera się na koncepcji niezawodnego kolejkowania wiadomości. Wiadomości są kolejkowane asynchronicznie między aplikacjami klienckimi a systemem przesyłania wiadomości. Dostępne są dwa typy wzorców przesyłania komunikatów - jeden to punkt do punktu, a drugi to system przesyłania komunikatów publikuj-subskrybuj (pub-sub). Większość wzorców przesyłania wiadomości jest następującapub-sub.

Point to Point Messaging System

W systemie typu punkt-punkt komunikaty są utrwalane w kolejce. Jeden lub więcej konsumentów może konsumować komunikaty w kolejce, ale konkretny komunikat może być używany tylko przez jednego konsumenta. Gdy konsument przeczyta wiadomość w kolejce, znika ona z tej kolejki. Typowym przykładem tego systemu jest system przetwarzania zamówień, w którym każde zamówienie będzie przetwarzane przez jeden podmiot przetwarzający zamówienia, ale jednocześnie może działać wiele procesorów zamówień. Poniższy diagram przedstawia strukturę.

System wiadomości publikuj-subskrybuj

W systemie publikowania i subskrybowania komunikaty są utrwalane w temacie. W przeciwieństwie do systemu punkt-punkt, konsumenci mogą subskrybować jeden lub więcej tematów i wykorzystywać wszystkie wiadomości w tym temacie. W systemie publikuj-subskrybuj producenci wiadomości nazywani są wydawcami, a odbiorcy wiadomości - subskrybentami. Przykładem z życia jest Dish TV, które publikuje różne kanały, takie jak sport, filmy, muzyka itp., A każdy może subskrybować własny zestaw kanałów i pobierać je, gdy tylko są dostępne.

Co to jest Kafka?

Apache Kafka to rozproszony system przesyłania wiadomości typu publikuj-subskrybuj oraz solidną kolejkę, która może obsługiwać duże ilości danych i umożliwia przekazywanie wiadomości z jednego punktu końcowego do drugiego. Kafka nadaje się zarówno do konsumpcji wiadomości offline, jak i online. Komunikaty Kafka są utrwalane na dysku i replikowane w klastrze, aby zapobiec utracie danych. Kafka jest zbudowana na bazie usługi synchronizacji ZooKeeper. Bardzo dobrze integruje się z Apache Storm i Spark w celu analizy danych strumieniowych w czasie rzeczywistym.

Korzyści

Oto kilka zalet Kafki -

  • Reliability - Kafka jest rozproszona, podzielona na partycje, replikowana i odporna na błędy.

  • Scalability - System przesyłania wiadomości Kafka łatwo się skaluje bez przestojów.

  • Durability- Kafka korzysta z dziennika dystrybucji rozproszonych, co oznacza, że ​​komunikaty pozostają na dysku tak szybko, jak to możliwe, dzięki czemu są trwałe.

  • Performance- Kafka ma wysoką przepustowość zarówno w przypadku publikowania, jak i subskrybowania wiadomości. Utrzymuje stabilną wydajność, nawet przechowywanych jest wiele TB wiadomości.

Kafka jest bardzo szybka i gwarantuje zero przestojów i brak utraty danych.

Przypadków użycia

Kafka może być używany w wielu przypadkach użycia. Niektóre z nich są wymienione poniżej -

  • Metrics- Kafka jest często używana do danych monitorowania operacyjnego. Obejmuje to agregowanie statystyk z aplikacji rozproszonych w celu tworzenia scentralizowanych źródeł danych operacyjnych.

  • Log Aggregation Solution - Kafka może być używany w całej organizacji do zbierania dzienników z wielu usług i udostępniania ich w standardowym formacie wielu konsumentom.

  • Stream Processing- Popularne platformy, takie jak Storm i Spark Streaming, odczytują dane z tematu, przetwarza je i zapisują przetworzone dane w nowym temacie, gdzie stają się dostępne dla użytkowników i aplikacji. Duża trwałość Kafki jest również bardzo przydatna w kontekście przetwarzania strumieniowego.

Potrzeba Kafki

Kafka to ujednolicona platforma do obsługi wszystkich źródeł danych w czasie rzeczywistym. Kafka obsługuje dostarczanie wiadomości o małych opóźnieniach i daje gwarancję odporności na błędy w przypadku awarii maszyny. Ma możliwość obsługi dużej liczby różnorodnych konsumentów. Kafka jest bardzo szybka, wykonuje 2 miliony zapisów / sek. Kafka zachowuje wszystkie dane na dysku, co zasadniczo oznacza, że ​​wszystkie zapisy trafiają do pamięci podręcznej strony systemu operacyjnego (RAM). Dzięki temu bardzo wydajne jest przesyłanie danych z pamięci podręcznej stron do gniazda sieciowego.

Zanim zagłębisz się w Kafkę, musisz zapoznać się z podstawowymi terminologiami, takimi jak tematy, brokerzy, producenci i konsumenci. Poniższy diagram ilustruje główne terminologie, a tabela szczegółowo opisuje składniki diagramu.

Na powyższym diagramie temat jest podzielony na trzy partycje. Partycja 1 ma dwa współczynniki przesunięcia 0 i 1. Partycja 2 ma cztery współczynniki przesunięcia 0, 1, 2 i 3. Partycja 3 ma jeden współczynnik przesunięcia 0. Identyfikator repliki jest taki sam jak identyfikator serwera, który ją obsługuje.

Załóżmy, że jeśli współczynnik replikacji tematu jest ustawiony na 3, wówczas Kafka utworzy 3 identyczne repliki każdej partycji i umieści je w klastrze, aby udostępnić je dla wszystkich swoich operacji. Aby zrównoważyć obciążenie w klastrze, każdy broker przechowuje co najmniej jedną z tych partycji. Wielu producentów i konsumentów może jednocześnie publikować i pobierać wiadomości.

S.Nr Komponenty i opis
1

Topics

Strumień wiadomości należących do określonej kategorii nazywany jest tematem. Dane są przechowywane w tematach.

Tematy są podzielone na partycje. Dla każdego tematu Kafka utrzymuje minimalną jedną partycję. Każda taka partycja zawiera komunikaty w niezmiennej uporządkowanej kolejności. Partycja jest zaimplementowana jako zestaw plików segmentów o równych rozmiarach.

2

Partition

Tematy mogą mieć wiele partycji, więc może obsłużyć dowolną ilość danych.

3

Partition offset

Każda wiadomość podzielona na partycje ma unikalny identyfikator sekwencji nazywany przesunięciem .

4

Replicas of partition

Repliki to nic innego jak kopie zapasowe partycji. Repliki nigdy nie odczytują ani nie zapisują danych. Służą do zapobiegania utracie danych.

5

Brokers

  • Brokerzy to prosty system odpowiedzialny za przechowywanie publikowanych danych. Każdy broker może mieć zero lub więcej partycji na temat. Załóżmy, że jeśli w temacie jest N partycji i N brokerów, każdy broker będzie miał jedną partycję.

  • Załóżmy, że w temacie jest N partycji i więcej niż N brokerów (n + m), pierwszy broker N będzie miał jedną partycję, a następny broker M nie będzie miał żadnej partycji dla tego konkretnego tematu.

  • Załóżmy, że w temacie jest N partycji i mniej niż N brokerów (nm), każdy broker będzie miał między sobą jedną lub więcej partycji. Ten scenariusz nie jest zalecany ze względu na nierówny rozkład obciążenia między brokerem.

6

Kafka Cluster

Kafka ma więcej niż jednego brokera nazywane klastrem Kafka. Klaster Kafka można rozbudowywać bez przestojów. Te klastry służą do zarządzania trwałością i replikacją danych komunikatów.

7

Producers

Producenci są wydawcami wiadomości do co najmniej jednego tematu platformy Kafka. Producenci wysyłają dane do brokerów Kafka. Za każdym razem, gdy producent publikuje wiadomość dla brokera, ten po prostu dołącza wiadomość do ostatniego pliku segmentu. Właściwie wiadomość zostanie dołączona do partycji. Producent może również wysyłać wiadomości do wybranej przez siebie partycji.

8

Consumers

Konsumenci czytają dane od brokerów. Konsumenci subskrybują jeden lub więcej tematów i korzystają z opublikowanych wiadomości, pobierając dane od brokerów.

9

Leader

Leader to węzeł odpowiedzialny za wszystkie odczyty i zapisy dla danej partycji. Każda partycja ma jeden serwer pełniący rolę lidera.

10

Follower

Węzeł, który podąża za instrukcjami lidera, nazywany jest podążającym. Jeśli lider zawiedzie, jeden ze zwolenników automatycznie zostanie nowym liderem. Obserwujący działa jak zwykły konsument, pobiera komunikaty i aktualizuje własny magazyn danych.

Spójrz na poniższą ilustrację. Pokazuje diagram skupień Kafki.

Poniższa tabela opisuje każdy z elementów pokazanych na powyższym schemacie.

S.Nr Komponenty i opis
1

Broker

Klaster Kafka zazwyczaj składa się z wielu brokerów w celu utrzymania równowagi obciążenia. Brokerzy Kafka są bezpaństwowi, więc używają ZooKeepera do utrzymywania stanu klastra. Jedna instancja brokera Kafka może obsłużyć setki tysięcy odczytów i zapisów na sekundę, a każdy bro-ker może obsłużyć TB wiadomości bez wpływu na wydajność. Wybór lidera brokera Kafka może zostać przeprowadzony przez ZooKeeper.

2

ZooKeeper

ZooKeeper służy do zarządzania i koordynowania brokera Kafka. Usługa ZooKeeper służy głównie do powiadamiania producenta i konsumenta o obecności nowego brokera w systemie Kafka lub awarii brokera w systemie Kafka. Zgodnie z powiadomieniem otrzymanym przez Zookeepera o obecności lub awarii brokera, producent i konsument podejmują decyzję i zaczynają koordynować swoje zadania z innym brokerem.

3

Producers

Producenci przesyłają dane do brokerów. Po uruchomieniu nowego brokera wszyscy producenci przeszukują go i automatycznie wysyłają wiadomość do tego nowego brokera. Producent Kafki nie czeka na potwierdzenia od brokera i wysyła wiadomości tak szybko, jak tylko broker może je obsłużyć.

4

Consumers

Ponieważ brokerzy Kafka są bezstanowe, co oznacza, że ​​konsument musi utrzymywać liczbę wykorzystanych wiadomości, używając przesunięcia partycji. Jeśli konsument potwierdza określone przesunięcie komunikatu, oznacza to, że konsument wykorzystał wszystkie poprzednie komunikaty. Konsument wysyła asynchroniczne żądanie ściągnięcia do brokera, aby mieć bufor bajtów gotowy do wykorzystania. Konsumenci mogą przewinąć lub przeskoczyć do dowolnego punktu w partycji, po prostu podając wartość przesunięcia. Wartość offsetu konsumenta jest zgłaszana przez ZooKeeper.

Na razie omówiliśmy podstawowe koncepcje Kafki. Rzućmy teraz trochę światła na przepływ pracy w Kafce.

Kafka to po prostu zbiór tematów podzielonych na jedną lub więcej partycji. Partycja Kafka to uporządkowana liniowo sekwencja komunikatów, w której każda wiadomość jest identyfikowana przez swój indeks (nazywany przesunięciem). Wszystkie dane w klastrze Kafka to rozłączna suma partycji. Przychodzące wiadomości są zapisywane na końcu partycji, a wiadomości są kolejno odczytywane przez konsumentów. Trwałość zapewnia replikacja wiadomości do różnych brokerów.

Kafka zapewnia system przesyłania wiadomości oparty na pub-sub i kolejce w sposób szybki, niezawodny, trwały, odporny na błędy i bez przestojów. W obu przypadkach producenci po prostu wysyłają wiadomość na dany temat, a konsument może wybrać dowolny typ systemu przesyłania wiadomości w zależności od potrzeb. Postępujmy zgodnie z instrukcjami w następnej sekcji, aby zrozumieć, w jaki sposób konsument może wybrać wybrany przez siebie system przesyłania wiadomości.

Przepływ pracy wiadomości Pub-Sub

Poniżej przedstawiono krokowy przepływ pracy wiadomości Pub-Sub -

  • Producenci wysyłają wiadomość do tematu w regularnych odstępach czasu.

  • Broker Kafka przechowuje wszystkie komunikaty na partycjach skonfigurowanych dla tego konkretnego tematu. Zapewnia równy podział wiadomości między partycjami. Jeśli producent wyśle ​​dwie wiadomości i istnieją dwie partycje, Kafka zapisze jedną wiadomość w pierwszej partycji, a drugą w drugiej.

  • Konsument subskrybuje określony temat.

  • Gdy konsument zasubskrybuje temat, Kafka zapewni konsumentowi bieżące przesunięcie tematu, a także zapisze przesunięcie w zespole Zookeeper.

  • Konsument będzie prosić Kafkę w regularnych odstępach czasu (np. 100 ms) o nowe wiadomości.

  • Gdy Kafka otrzyma wiadomości od producentów, przekazuje je konsumentom.

  • Konsument otrzyma wiadomość i ją przetworzy.

  • Po przetworzeniu wiadomości konsument wyśle ​​potwierdzenie do brokera Kafka.

  • Gdy Kafka otrzyma potwierdzenie, zmienia przesunięcie na nową wartość i aktualizuje ją w Zookeeper. Ponieważ przesunięcia są utrzymywane w Zookeeper, konsument może poprawnie odczytać następną wiadomość nawet podczas ataku serwera.

  • Powyższy przepływ będzie się powtarzał, dopóki konsument nie zatrzyma żądania.

  • Konsument ma możliwość przewinięcia / przeskoczenia do żądanego przesunięcia tematu w dowolnym momencie i przeczytania wszystkich kolejnych wiadomości.

Przepływ wiadomości w kolejce / Grupa konsumentów

W systemie obsługi wiadomości w kolejce zamiast pojedynczego konsumenta grupa konsumentów o tym samym identyfikatorze grupy będzie subskrybować temat. Mówiąc prościej, konsumenci subskrybujący temat z tym samym identyfikatorem grupy są traktowani jako pojedyncza grupa, a wiadomości są między nimi współdzielone. Sprawdźmy rzeczywisty przepływ pracy tego systemu.

  • Producenci wysyłają wiadomość do tematu w regularnych odstępach czasu.

  • Kafka przechowuje wszystkie wiadomości na partycjach skonfigurowanych dla tego konkretnego tematu, podobnie jak we wcześniejszym scenariuszu.

  • Pojedynczy subskrybuje konsumenta do konkretnego tematu, załóżmy Topic-01 z ID grupy jako grupy 1 .

  • Kafka współdziała z konsumentem w taki sam sposób, jak wiadomości Pub-Sub, dopóki nowy konsument nie zasubskrybuje tego samego tematu, Temat-01 z tym samym identyfikatorem grupy co Grupa-1 .

  • Po przybyciu nowego konsumenta Kafka przełącza swoją pracę w tryb udostępniania i udostępnia dane między dwoma konsumentami. To udostępnianie będzie kontynuowane, dopóki liczba konsumentów nie osiągnie liczby partycji skonfigurowanej dla tego konkretnego tematu.

  • Gdy liczba konsumentów przekroczy liczbę partycji, nowy konsument nie otrzyma żadnej kolejnej wiadomości, dopóki którykolwiek z istniejących konsumentów nie zrezygnuje z subskrypcji. Ten scenariusz powstaje, ponieważ każdemu konsumentowi w Kafce zostanie przypisana co najmniej jedna partycja, a gdy wszystkie partycje zostaną przypisane do istniejących odbiorców, nowi konsumenci będą musieli poczekać.

  • Ta funkcja jest również nazywana grupą konsumentów . W ten sam sposób Kafka zapewni to, co najlepsze z obu systemów w bardzo prosty i wydajny sposób.

Rola ZooKeeper

Krytyczną zależnością Apache Kafka jest Apache Zookeeper, który jest rozproszoną usługą konfiguracji i synchronizacji. Zookeeper służy jako interfejs koordynacyjny między brokerami Kafki a konsumentami. Serwery Kafka udostępniają informacje za pośrednictwem klastra Zookeeper. Kafka przechowuje podstawowe metadane w Zookeeper, takie jak informacje o tematach, brokerach, offsetach konsumenckich (czytnikach kolejek) i tak dalej.

Ponieważ wszystkie krytyczne informacje są przechowywane w Zookeeper i zwykle replikuje te dane w swoim zespole, awaria brokera Kafka / Zookeepera nie wpływa na stan klastra Kafka. Kafka przywróci stan po ponownym uruchomieniu Zookeeper. Daje to zero przestojów dla Kafki. Wybór lidera między brokerem Kafka odbywa się również przy użyciu Zookeepera w przypadku niepowodzenia lidera.

Aby dowiedzieć się więcej na Heca, zapoznaj zookeeper

Kontynuujmy dalej, jak zainstalować Javę, ZooKeepera i Kafkę na twoim komputerze w następnym rozdziale.

Poniżej przedstawiono kroki instalacji oprogramowania Java na komputerze.

Krok 1 - weryfikacja instalacji Java

Mamy nadzieję, że masz już zainstalowaną Javę na swoim komputerze, więc po prostu zweryfikuj ją za pomocą następującego polecenia.

$ java -version

Jeśli java została pomyślnie zainstalowana na twoim komputerze, możesz zobaczyć wersję zainstalowanej Javy.

Krok 1.1 - Pobierz JDK

Jeśli Java nie została pobrana, pobierz najnowszą wersję JDK, odwiedzając poniższe łącze i pobierz najnowszą wersję.

http://www.oracle.com/technetwork/java/javase/downloads/index.html

Obecnie najnowsza wersja to JDK 8u 60, a plik to „jdk-8u60-linux-x64.tar.gz”. Pobierz plik na swój komputer.

Krok 1.2 - Wyodrębnij pliki

Zwykle pobierane pliki są przechowywane w folderze pobierania, zweryfikuj go i wyodrębnij ustawienia tar za pomocą następujących poleceń.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Krok 1.3 - Przejdź do katalogu opcji

Aby udostępnić oprogramowanie Java wszystkim użytkownikom, przenieś wyodrębnioną zawartość Java do folderu usr / local / java /.

$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/

Krok 1.4 - Ustaw ścieżkę

Aby ustawić ścieżkę i zmienne JAVA_HOME, dodaj następujące polecenia do pliku ~ / .bashrc.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

Teraz zastosuj wszystkie zmiany w aktualnie działającym systemie.

$ source ~/.bashrc

Krok 1.5 - Alternatywy dla języka Java

Użyj następującego polecenia, aby zmienić alternatywy Java.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6 - Teraz sprawdź java za pomocą polecenia weryfikacyjnego (wersja-java) opisanego w kroku 1.

Krok 2 - Instalacja oprogramowania ZooKeeper Framework

Krok 2.1 - Pobierz ZooKeeper

Aby zainstalować środowisko ZooKeeper na swoim komputerze, kliknij poniższe łącze i pobierz najnowszą wersję ZooKeeper.

http://zookeeper.apache.org/releases.html

Obecnie najnowsza wersja ZooKeepera to 3.4.6 (ZooKeeper-3.4.6.tar.gz).

Krok 2.2 - Wypakuj plik tar

Wyodrębnij plik tar za pomocą następującego polecenia

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data

Krok 2.3 - Utwórz plik konfiguracyjny

Otwórz plik konfiguracyjny o nazwie conf / zoo.cfg za pomocą polecenia vi „conf / zoo.cfg” i wszystkich poniższych parametrów, aby ustawić je jako punkt początkowy.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Po pomyślnym zapisaniu pliku konfiguracyjnego i ponownym powrocie do terminala możesz uruchomić serwer zookeeper.

Krok 2.4 - Uruchom serwer ZooKeeper

$ bin/zkServer.sh start

Po wykonaniu tego polecenia otrzymasz odpowiedź, jak pokazano poniżej -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED

Krok 2.5 - Uruchom CLI

$ bin/zkCli.sh

Po wpisaniu powyższego polecenia zostaniesz połączony z serwerem zookeeper i otrzymasz poniższą odpowiedź.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Krok 2.6 - Zatrzymaj serwer Zookeeper

Po podłączeniu serwera i wykonaniu wszystkich operacji możesz zatrzymać serwer zookeeper za pomocą następującego polecenia -

$ bin/zkServer.sh stop

Teraz pomyślnie zainstalowałeś Javę i ZooKeepera na swoim komputerze. Zobaczmy, jak zainstalować Apache Kafka.

Krok 3 - Instalacja Apache Kafka

Kontynuujmy następujące kroki, aby zainstalować Kafkę na twoim komputerze.

Krok 3.1 - Pobierz Kafkę

Aby zainstalować Kafkę na swoim komputerze, kliknij poniższy link -

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Teraz najnowsza wersja, tj. - kafka_2.11_0.9.0.0.tgz zostanie pobrany na twój komputer.

Krok 3.2 - Rozpakuj plik tar

Wyodrębnij plik tar za pomocą następującego polecenia -

$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

Teraz pobrałeś najnowszą wersję Kafki na swój komputer.

Krok 3.3 - Uruchom serwer

Możesz uruchomić serwer, wydając następujące polecenie -

$ bin/kafka-server-start.sh config/server.properties

Po uruchomieniu serwera na ekranie zobaczysz poniższą odpowiedź -

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

Krok 4 - Zatrzymaj serwer

Po wykonaniu wszystkich operacji możesz zatrzymać serwer za pomocą następującego polecenia -

$ bin/kafka-server-stop.sh config/server.properties

Teraz, gdy omówiliśmy już instalację Kafki, w następnym rozdziale możemy dowiedzieć się, jak wykonywać podstawowe operacje na Kafce.

Najpierw zacznijmy implementować konfigurację brokera z jednym węzłem i jednym węzłem, a następnie przeprowadzimy migrację naszej konfiguracji do konfiguracji brokerów z jednym węzłem i wieloma.

Miejmy nadzieję, że już zainstalowałbyś Javę, ZooKeepera i Kafkę na swoim komputerze. Przed przejściem do konfiguracji klastra Kafka, najpierw musisz uruchomić ZooKeeper, ponieważ klaster Kafka używa ZooKeeper.

Uruchom ZooKeeper

Otwórz nowy terminal i wpisz następujące polecenie -

bin/zookeeper-server-start.sh config/zookeeper.properties

Aby uruchomić Kafka Broker, wpisz następujące polecenie -

bin/kafka-server-start.sh config/server.properties

Po uruchomieniu Kafka Broker wpisz polecenie jps na terminalu ZooKeeper, a zobaczysz następującą odpowiedź -

821 QuorumPeerMain
928 Kafka
931 Jps

Teraz możesz zobaczyć dwa demony działające na terminalu, gdzie QuorumPeerMain to demon ZooKeeper, a drugi to demon Kafka.

Konfiguracja brokera z pojedynczym węzłem

W tej konfiguracji masz pojedynczą instancję ZooKeepera i identyfikatora brokera. Oto kroki, aby go skonfigurować -

Creating a Kafka Topic- Kafka udostępnia narzędzie wiersza poleceń o nazwie kafka-topics.sh do tworzenia tematów na serwerze. Otwórz nowy terminal i wpisz poniższy przykład.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

Właśnie utworzyliśmy temat o nazwie Hello-Kafka z jedną partycją i jednym współczynnikiem repliki. Powyższy utworzony wynik będzie podobny do następującego wyniku -

Output- Utworzono temat Hello-Kafka

Po utworzeniu tematu można uzyskać powiadomienie w oknie terminala brokera Kafka i dziennik utworzonego tematu określonego w „/ tmp / kafka-logs /” w pliku config / server.properties.

Lista tematów

Aby uzyskać listę tematów na serwerze Kafka, możesz użyć następującego polecenia -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

Ponieważ utworzyliśmy temat, wyświetli się tylko Hello-Kafka . Załóżmy, że jeśli utworzysz więcej niż jeden temat, w wyniku otrzymasz nazwy tematów.

Uruchom producenta, aby wysyłać wiadomości

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

Z powyższej składni wymagane są dwa główne parametry dla klienta wiersza poleceń producenta -

Broker-list- Lista brokerów, do których chcemy wysyłać wiadomości. W tym przypadku mamy tylko jednego brokera. Plik Config / server.properties zawiera identyfikator portu brokera, ponieważ wiemy, że nasz broker nasłuchuje na porcie 9092, więc możesz go określić bezpośrednio.

Nazwa tematu - oto przykład nazwy tematu.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Producent będzie czekał na dane wejściowe ze stdin i publikuje w klastrze Kafka. Domyślnie każda nowa linia jest publikowana jako nowa wiadomość, a następnie domyślne właściwości producenta są określone w pliku config / production.properties . Teraz możesz wpisać kilka wierszy wiadomości w terminalu, jak pokazano poniżej.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Rozpocznij konsumenta, aby otrzymywać wiadomości

Podobnie jak w przypadku producenta, domyślne właściwości konsumenta są określone w pliku config / consumer.proper-ties . Otwórz nowy terminal i wpisz poniższą składnię do używania wiadomości.

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

Wreszcie możesz wprowadzić wiadomości z terminala producenta i zobaczyć je pojawiające się w terminalu konsumenta. W tej chwili masz bardzo dobrą wiedzę na temat klastra z jednym węzłem z jednym brokerem. Przejdźmy teraz do konfiguracji wielu brokerów.

Konfiguracja brokerów z jednym węzłem i wieloma

Przed przejściem do konfiguracji klastra wielu brokerów, najpierw uruchom serwer ZooKeeper.

Create Multiple Kafka Brokers- Mamy już jedną instancję brokera Kafka w konfiguracji / server.properties. Teraz potrzebujemy wielu instancji brokera, więc skopiuj istniejący plik server.prop-erties do dwóch nowych plików konfiguracyjnych i zmień jego nazwę na server-one.properties i server-two.prop-erties. Następnie edytuj oba nowe pliki i przypisz następujące zmiany -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- Po wprowadzeniu wszystkich zmian na trzech serwerach otwórz trzy nowe terminale, aby po kolei uruchomić każdego brokera.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Teraz mamy trzech różnych brokerów działających na komputerze. Spróbuj sam, aby sprawdzić wszystkie demony, wpisującjps na terminalu ZooKeeper, wtedy zobaczysz odpowiedź.

Tworzenie tematu

Przypiszmy wartość współczynnika replikacji jako trzy dla tego tematu, ponieważ mamy uruchomionych trzech różnych brokerów. Jeśli masz dwóch brokerów, przypisana wartość repliki będzie wynosić dwa.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

Polecenie Opisz służy do sprawdzenia, który broker nasłuchuje aktualnie utworzonego tematu, jak pokazano poniżej -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

Z powyższego wyniku możemy wywnioskować, że pierwsza linia zawiera podsumowanie wszystkich partycji, pokazując nazwę tematu, liczbę partycji i współczynnik replikacji, który już wybraliśmy. W drugim wierszu każdy węzeł będzie liderem w losowo wybranej części partycji.

W naszym przypadku widzimy, że nasz pierwszy broker (z broker.id 0) jest liderem. Następnie Replicas: 0,2,1 oznacza, że ​​wszyscy brokerzy powielają temat, w końcu Isr jest zbiorem replik synchronizowanych . Cóż, to podzbiór replik, które obecnie są żywe i złapane przez lidera.

Uruchom producenta, aby wysyłać wiadomości

Ta procedura pozostaje taka sama, jak w przypadku konfiguracji z jednym brokerem.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Rozpocznij konsumenta, aby otrzymywać wiadomości

Ta procedura pozostaje taka sama, jak pokazano w konfiguracji jednego brokera.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Podstawowe operacje na tematach

W tym rozdziale omówimy różne podstawowe operacje tematyczne.

Modyfikowanie tematu

Jak już wiesz, jak utworzyć temat w klastrze Kafka. Teraz zmodyfikujmy utworzony temat za pomocą następującego polecenia

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Usuwanie tematu

Aby usunąć temat, możesz użyć następującej składni.

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −Nie będzie to miało wpływu, jeśli delete.topic.enable nie jest ustawiona na true

Stwórzmy aplikację do publikowania i konsumowania wiadomości za pomocą klienta Java. Klient producenta Kafka składa się z następujących API.

KafkaProducer API

Zrozummy najważniejszy zestaw API producenta Kafki w tej sekcji. Centralną częścią interfejsu API KafkaProducer jest klasa KafkaProducer . Klasa KafkaProducer udostępnia opcję łączenia brokera Kafka w jego konstruktorze za pomocą następujących metod.

  • Klasa KafkaProducer udostępnia metodę send służącą do asynchronicznego wysyłania komunikatów do tematu. Podpis funkcji send () jest następujący

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - Producent zarządza buforem nagrań oczekujących na wysłanie.

  • Callback - Dostarczone przez użytkownika wywołanie zwrotne do wykonania, gdy rekord zostanie potwierdzony przez serwer (wartość pusta oznacza brak wywołania zwrotnego).

  • Klasa KafkaProducer udostępnia metodę opróżniania, aby upewnić się, że wszystkie wcześniej wysłane wiadomości zostały faktycznie ukończone. Składnia metody flush jest następująca -

public void flush()
  • Klasa KafkaProducer udostępnia metodę partitionFor, która pomaga w uzyskaniu metadanych partycji dla danego tematu. Można tego użyć do niestandardowego partycjonowania. Podpis tej metody jest następujący -

public Map metrics()

Zwraca mapę wewnętrznych metryk utrzymywanych przez producenta.

  • public void close () - klasa KafkaProducer udostępnia bloki metod close do momentu zakończenia wszystkich wysłanych wcześniej żądań.

Producent API

Centralną częścią interfejsu API producenta jest klasa Producer . Klasa Producer udostępnia opcję połączenia brokera Kafka w jego konstruktorze za pomocą następujących metod.

Klasa producentów

Klasa producenta udostępnia metodę wysyłania do send wiadomości do jednego lub wielu tematów przy użyciu następujących podpisów.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Istnieją dwa rodzaje producentów - Sync i Async.

Ta sama konfiguracja API dotyczy również producenta Sync . Różnica między nimi polega na tym, że producent synchronizacji wysyła wiadomości bezpośrednio, ale wysyła wiadomości w tle. Producent asynchroniczny jest preferowany, jeśli chcesz uzyskać większą przepustowość. W poprzednich wersjach, takich jak 0.8, producent asynchroniczny nie ma wywołania zwrotnego funkcji send () w celu zarejestrowania obsługi błędów. Jest to dostępne tylko w bieżącej wersji 0.9.

public void close ()

Klasa Producer zapewnia close metoda zamykania połączeń puli producentów dla wszystkich braci Kafka.

Ustawienia konfiguracji

Główne ustawienia konfiguracyjne interfejsu API producenta są wymienione w poniższej tabeli dla lepszego zrozumienia -

S.Nr Ustawienia i opis konfiguracji
1

client.id

identyfikuje aplikację producenta

2

producer.type

synchronizacja lub asynchronizacja

3

acks

Konfiguracja acks kontroluje kryteria w żądaniach producenta, które są uznawane za zakończone.

4

retries

Jeśli żądanie producenta nie powiedzie się, automatycznie ponów próbę z określoną wartością.

5

bootstrap.servers

bootstrapowanie listy brokerów.

6

linger.ms

jeśli chcesz zmniejszyć liczbę żądań, możesz ustawić w linger.ms wartość większą niż pewna wartość.

7

key.serializer

Klucz do interfejsu serializatora.

8

value.serializer

wartość dla interfejsu serializatora.

9

batch.size

Rozmiar bufora.

10

buffer.memory

kontroluje całkowitą ilość pamięci dostępnej dla producenta do buforowania.

ProducerRecord API

ProducerRecord to para klucz / wartość, która jest wysyłana do klastra Kafka. Konstruktor klasy ProducerRecord w celu utworzenia rekordu z parami partycji, klucz i wartość przy użyciu następującego podpisu.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - nazwa tematu zdefiniowana przez użytkownika, która zostanie dołączona do rekordu.

  • Partition - liczba partycji

  • Key - klucz, który zostanie umieszczony w rekordzie.

  • Value - Nagraj zawartość
public ProducerRecord (string topic, k key, v value)

Konstruktor klasy ProducerRecord służy do tworzenia rekordu z kluczem, parami wartości i bez partycji.

  • Topic - Utwórz temat, aby przypisać rekord.

  • Key - klucz do rekordu.

  • Value - zawartość nagrania.

public ProducerRecord (string topic, v value)

Klasa ProducerRecord tworzy rekord bez partycji i klucza.

  • Topic - utwórz temat.

  • Value - zawartość nagrania.

Metody klasy ProducerRecord są wymienione w poniższej tabeli -

S.Nr Metody i opis zajęć
1

public string topic()

Temat zostanie dołączony do rekordu.

2

public K key()

Klucz, który zostanie umieszczony w rekordzie. Jeśli nie ma takiego klucza, tutaj zostanie ponownie zwrócona wartość null.

3

public V value()

Zapisz zawartość.

4

partition()

Liczba partycji dla rekordu

Aplikacja SimpleProducer

Przed utworzeniem aplikacji najpierw uruchom brokera ZooKeeper i Kafka, a następnie utwórz własny temat w brokerze Kafka za pomocą polecenia tworzenia tematu. Następnie utwórz klasę Java o nazwie Sim-pleProducer.java i wpisz następujące kodowanie.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - Aplikację można skompilować za pomocą następującego polecenia.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - Aplikację można uruchomić za pomocą następującego polecenia.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Prosty przykład dla konsumentów

Na chwilę obecną stworzyliśmy producenta do wysyłania wiadomości do klastra Kafka. Teraz stwórzmy konsumenta, który będzie konsumował wiadomości z klastra Kafka. Interfejs API KafkaConsumer służy do korzystania z komunikatów z klastra Kafka. Konstruktor klasy KafkaConsumer jest zdefiniowany poniżej.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Zwróć mapę konfiguracji konsumenckich.

Klasa KafkaConsumer zawiera następujące istotne metody wymienione w poniższej tabeli.

S.Nr Metoda i opis
1

public java.util.Set<TopicPar-tition> assignment()

Pobierz zestaw partycji aktualnie przypisanych przez konsumenta.

2

public string subscription()

Zapisz się do podanej listy tematów, aby dynamicznie otrzymywać przypisane partycje.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

Zapisz się do podanej listy tematów, aby dynamicznie otrzymywać przypisane partycje.

4

public void unsubscribe()

Wypisz tematy z podanej listy partycji.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

Zapisz się do podanej listy tematów, aby dynamicznie otrzymywać przypisane partycje. Jeśli podana lista tematów jest pusta, jest traktowana tak samo, jak unsubscribe ().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

Wzorzec argumentu odwołuje się do wzorca subskrybowania w formacie wyrażenia regularnego, a argument odbiornika pobiera powiadomienia z wzorca subskrybowania.

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

Ręcznie przypisz klientowi listę partycji.

8

poll()

Pobierz dane dla tematów lub partycji określonych za pomocą jednego z interfejsów API subskrybowania / przypisywania. Zwróci to błąd, jeśli tematy nie zostaną subskrybowane przed odpytywaniem o dane.

9

public void commitSync()

Przesunięcia zatwierdzone zwrócone w ostatniej ankiecie () dla wszystkich subskrybowanych list tematów i partycji. Ta sama operacja jest stosowana do commitAsyn ().

10

public void seek(TopicPartition partition, long offset)

Pobierz bieżącą wartość przesunięcia, której konsument użyje w następnej metodzie poll ().

11

public void resume()

Wznów wstrzymane partycje.

12

public void wakeup()

Obudź konsumenta.

ConsumerRecord API

Interfejs API ConsumerRecord służy do odbierania rekordów z klastra platformy Kafka. Ten interfejs API składa się z nazwy tematu, numeru partycji, z której jest odbierany rekord, oraz przesunięcia wskazującego na rekord w partycji Kafka. Klasa ConsumerRecord służy do tworzenia rekordu konsumenta z określoną nazwą tematu, liczbą partycji i parami <klucz, wartość>. Ma następujący podpis.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - nazwa tematu dla rekordu konsumenta otrzymanego z klastra Kafka.

  • Partition - Partycja na temat.

  • Key - Klucz rekordu, jeśli żaden klucz nie istnieje, zostanie zwrócony null.

  • Value - Nagraj zawartość.

ConsumerRecords API

Interfejs API ConsumerRecords działa jako kontener dla ConsumerRecord. Ten interfejs API służy do przechowywania listy ConsumerRecord na partycję dla określonego tematu. Jego konstruktor jest zdefiniowany poniżej.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Zwróć mapę partycji dla określonego tematu.

  • Records - Lista zwrotów ConsumerRecord.

ConsumerRecords ma zdefiniowane następujące metody.

S.Nr Metody i opis
1

public int count()

Liczba rekordów dla wszystkich tematów.

2

public Set partitions()

Zestaw przegród z danymi w tym zestawie rekordów (jeśli nie zwrócono żadnych danych, zestaw jest pusty).

3

public Iterator iterator()

Iterator umożliwia przechodzenie między zbieraniem, pozyskiwaniem lub usuwaniem elementów.

4

public List records()

Pobierz listę rekordów dla danej partycji.

Ustawienia konfiguracji

Poniżej wymieniono ustawienia konfiguracyjne głównego interfejsu API klienta klienta -

S.Nr Ustawienia i opis
1

bootstrap.servers

Lista startowa brokerów.

2

group.id

Przypisuje indywidualnego konsumenta do grupy.

3

enable.auto.commit

Włącz automatyczne zatwierdzanie dla przesunięć, jeśli wartość jest prawdziwa, w przeciwnym razie nie została zatwierdzona.

4

auto.commit.interval.ms

Zwróć, jak często aktualizowane używane przesunięcia są zapisywane w ZooKeeper.

5

session.timeout.ms

Wskazuje, ile milisekund Kafka będzie czekać, aż ZooKeeper odpowie na żądanie (odczyt lub zapis), zanim zrezygnuje i będzie kontynuował korzystanie z wiadomości.

SimpleConsumer Application

Kroki aplikacji producenta pozostają tutaj takie same. Najpierw uruchom swojego brokera ZooKeeper i Kafka. Następnie utwórz aplikację SimpleConsumer z klasą java o nazwie SimpleCon-sumer.java i wpisz następujący kod.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - Aplikację można skompilować za pomocą następującego polecenia.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − Aplikację można uruchomić za pomocą następującego polecenia

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- Otwórz CLI producenta i wyślij kilka wiadomości do tematu. Możesz umieścić proste dane wejściowe jako „Witaj konsumencie”.

Output - Następujące będzie wyjście.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Grupa konsumentów to wielowątkowa lub wielomaszynowa konsumpcja z tematów Kafki.

Grupa konsumentów

  • Konsumenci mogą dołączyć do grupy, używając tego samego group.id.

  • Maksymalna równoległość grupy to liczba odbiorców w grupie ← liczba przegród.

  • Kafka przypisuje partycje tematu do konsumenta w grupie, tak aby każda partycja była używana przez dokładnie jednego konsumenta w grupie.

  • Kafka gwarantuje, że wiadomość jest czytana tylko przez jednego konsumenta w grupie.

  • Konsumenci widzą wiadomość w kolejności, w jakiej zostały zapisane w dzienniku.

Przywrócenie równowagi konsumenta

Dodanie większej liczby procesów / wątków spowoduje ponowne zrównoważenie Kafki. Jeśli jakiemukolwiek konsumentowi lub brokerowi nie uda się wysłać pulsu do ZooKeepera, można go ponownie skonfigurować za pośrednictwem klastra Kafka. Podczas tego ponownego równoważenia Kafka przypisze dostępne partycje do dostępnych wątków, prawdopodobnie przenosząc partycję do innego procesu.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Kompilacja

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Wykonanie

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Tutaj utworzyliśmy przykładową nazwę grupy jako my-group z dwoma konsumentami. Podobnie możesz utworzyć swoją grupę i liczbę konsumentów w grupie.

Wejście

Otwórz CLI producenta i wyślij kilka wiadomości, takich jak -

Test consumer group 01
Test consumer group 02

Wynik pierwszego procesu

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Wynik drugiego procesu

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Miejmy nadzieję, że zrozumielibyście SimpleConsumer i ConsumeGroup, korzystając z demonstracji klienta Java. Teraz masz pomysł, jak wysyłać i odbierać wiadomości za pomocą klienta Java. Kontynuujmy integrację Kafki z technologiami big data w następnym rozdziale.

W tym rozdziale dowiemy się, jak zintegrować Kafkę z Apache Storm.

O Storm

Storm został pierwotnie stworzony przez Nathana Marz i zespół BackType. W krótkim czasie Apache Storm stał się standardem dla rozproszonego systemu przetwarzania w czasie rzeczywistym, który umożliwia przetwarzanie ogromnych ilości danych. Storm jest bardzo szybki, a benchmark taktował go z prędkością ponad miliona krotek przetwarzanych na sekundę na węzeł. Apache Storm działa w sposób ciągły, pobiera dane ze skonfigurowanych źródeł (Spouts) i przekazuje je w dół potoku przetwarzania (Bolts). Połączone, wylewki i śruby tworzą topologię.

Integracja z Storm

Kafka i Storm w naturalny sposób uzupełniają się wzajemnie, a ich potężna współpraca umożliwia analizę strumieniową w czasie rzeczywistym dla szybko zmieniających się dużych zbiorów danych. Integracja Kafka i Storm ma ułatwić programistom pozyskiwanie i publikowanie strumieni danych z topologii Storm.

Koncepcyjny przepływ

Wylewka jest źródłem strumieni. Na przykład wylewka może odczytywać krotki z tematu Kafki i emitować je jako strumień. Śruba zużywa strumienie wejściowe, przetwarza i prawdopodobnie emituje nowe strumienie. Bolts może robić wszystko, od uruchamiania funkcji, filtrowania krotek, agregacji strumieniowych, łączenia strumieniowego, komunikacji z bazami danych i nie tylko. Każdy węzeł w topologii Storm działa równolegle. Topologia działa przez czas nieokreślony, dopóki jej nie zakończysz. Storm automatycznie ponownie przydzieli wszystkie nieudane zadania. Dodatkowo Storm gwarantuje, że nie nastąpi utrata danych, nawet jeśli maszyny ulegną awarii, a wiadomości zostaną odrzucone.

Przyjrzyjmy się szczegółowo interfejsom API integracji Kafka-Storm. Istnieją trzy główne klasy integrujące Kafkę ze Stormem. Są następujące -

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts to interfejs, a ZkHosts i StaticHosts to jego dwie główne implementacje. ZkHosts służy do dynamicznego śledzenia brokerów Kafka poprzez utrzymywanie szczegółów w ZooKeeper, podczas gdy StaticHosts służy do ręcznego / statycznego ustawiania brokerów Kafka i ich szczegółów. ZkHosts to prosty i szybki sposób na dostęp do brokera Kafka.

Podpis ZkHosts jest następujący -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Gdzie brokerZkStr jest hostem ZooKeeper, a brokerZkPath jest ścieżką ZooKeeper do zarządzania danymi brokera Kafka.

KafkaConfig API

Ten interfejs API służy do definiowania ustawień konfiguracji dla klastra Kafka. Podpis Kafki Con-fig jest zdefiniowany w następujący sposób

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - BrokerHosts może być ZkHosts / StaticHosts.

    Topic - nazwa tematu.

SpoutConfig API

Spoutconfig to rozszerzenie KafkaConfig, które obsługuje dodatkowe informacje ZooKeeper.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - BrokerHosts może być dowolną implementacją interfejsu BrokerHosts

  • Topic - nazwa tematu.

  • zkRoot - Ścieżka główna ZooKeeper.

  • id −Dziobek przechowuje stan odsadzek, które zostały zużyte w Zookeeper. Identyfikator powinien jednoznacznie identyfikować Twoją wylewkę.

SchemeAsMultiScheme

SchemeAsMultiScheme to interfejs, który dyktuje, w jaki sposób ByteBuffer konsumowany przez Kafkę zostanie przekształcony w krotkę burzy. Wywodzi się z MultiScheme i akceptujemy implementację klasy Scheme. Istnieje wiele implementacji klasy Scheme, a jedną z takich implementacji jest StringScheme, który analizuje bajt jako prosty ciąg. Kontroluje również nazewnictwo pola wyjściowego. Podpis jest zdefiniowany w następujący sposób.

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - bufor bajtów zużyty z kafki.

KafkaSpout API

KafkaSpout to nasza realizacja wylewki, która będzie zintegrowana ze Stormem. Pobiera wiadomości z tematu kafka i emituje je do ekosystemu Storm jako krotki. KafkaSpout pobiera szczegóły konfiguracji z SpoutConfig.

Poniżej znajduje się przykładowy kod do stworzenia prostej wylewki Kafki.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Tworzenie śrub

Bolt to komponent, który pobiera krotki jako dane wejściowe, przetwarza krotkę i generuje nowe krotki jako dane wyjściowe. Bolts zaimplementuje interfejs IRichBolt. W tym programie do wykonywania operacji używane są dwie klasy śrub, WordSplitter-Bolt i WordCounterBolt.

Interfejs IRichBolt ma następujące metody -

  • Prepare- Zapewnia śrubie środowisko do wykonania. Wykonawcy uruchomią tę metodę w celu zainicjowania wylewki.

  • Execute - Przetwarzaj pojedynczą krotkę danych wejściowych.

  • Cleanup - Wołano, gdy zamyka się zasuwka.

  • declareOutputFields - Deklaruje schemat wyjściowy krotki.

Stwórzmy SplitBolt.java, który implementuje logikę dzielenia zdania na słowa i CountBolt.java, który implementuje logikę do oddzielania unikalnych słów i liczenia ich występowania.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Przesyłanie do topologii

Topologia Storm jest w zasadzie strukturą Thrift. Klasa TopologyBuilder udostępnia proste i łatwe metody tworzenia złożonych topologii. Klasa TopologyBuilder zawiera metody do ustawiania wylewki (setSpout) i ustawiania śruby (setBolt). Wreszcie, TopologyBuilder ma createTopology do tworzenia pology. shuffleGrouping i pola Metody grupowania pomagają ustawić grupowanie strumieni dla wylewki i śrub.

Local Cluster- Dla celów rozwoju, możemy stworzyć lokalny klaster korzystając LocalCluster obiekt, a następnie przedstawić topologię używając submitTopology metodę LocalCluster klasie.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

Przed przeniesieniem kompilacji, integracja Kakfa-Storm wymaga biblioteki java klienta ZooKeeper. Wersja 2.9.1 kuratora obsługuje Apache Storm w wersji 0.9.5 (której używamy w tym samouczku). Pobierz poniższe pliki jar i umieść je w ścieżce klas java.

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

Po dołączeniu plików zależności skompiluj program za pomocą następującego polecenia,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Wykonanie

Uruchom interfejs wiersza polecenia Kafka Producer (wyjaśniony w poprzednim rozdziale), utwórz nowy temat o nazwie mój-pierwszy-temat i podaj kilka przykładowych wiadomości, jak pokazano poniżej -

hello
kafka
storm
spark
test message
another test message

Teraz uruchom aplikację za pomocą następującego polecenia -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

Przykładowe dane wyjściowe tej aplikacji są określone poniżej -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

W tym rozdziale omówimy sposób integracji Apache Kafka z Spark Streaming API.

O Spark

Spark Streaming API umożliwia skalowalne, wysokoprzepustowe, odporne na błędy przetwarzanie strumieni danych na żywo. Dane mogą być pozyskiwane z wielu źródeł, takich jak Kafka, Flume, Twitter itp., I mogą być przetwarzane przy użyciu złożonych algorytmów, takich jak funkcje wysokiego poziomu, takie jak mapowanie, redukcja, łączenie i okno. Wreszcie przetworzone dane mogą być wypychane do systemów plików, baz danych i działających na żywo tablic rozdzielczych. Resilient Distributed Datasets (RDD) to podstawowa struktura danych platformy Spark. Jest to niezmienny, rozproszony zbiór obiektów. Każdy zestaw danych w RDD jest podzielony na partycje logiczne, które mogą być obliczane na różnych węzłach klastra.

Integracja z Spark

Kafka to potencjalna platforma przesyłania wiadomości i integracji dla przesyłania strumieniowego Spark. Kafka działa jako centralne centrum strumieni danych w czasie rzeczywistym i są przetwarzane przy użyciu złożonych algorytmów w usłudze Spark Streaming. Po przetworzeniu danych Spark Streaming może publikować wyniki w jeszcze innym temacie Kafka lub przechowywać w HDFS, bazach danych lub pulpitach nawigacyjnych. Poniższy diagram przedstawia koncepcyjny przepływ.

Przejdźmy teraz szczegółowo do interfejsu API Kafka-Spark.

SparkConf API

Reprezentuje konfigurację dla aplikacji Spark. Służy do ustawiania różnych parametrów Spark jako par klucz-wartość.

Klasa SparkConf ma następujące metody -

  • set(string key, string value) - ustaw zmienną konfiguracyjną.

  • remove(string key) - usuń klucz z konfiguracji.

  • setAppName(string name) - ustaw nazwę aplikacji dla swojej aplikacji.

  • get(string key) - zdobądź klucz

StreamingContext API

To jest główny punkt wejścia do funkcjonalności Spark. SparkContext reprezentuje połączenie z klastrem Spark i może służyć do tworzenia RDD, akumulatorów i zmiennych emisji w klastrze. Podpis jest zdefiniowany w sposób pokazany poniżej.

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - adres URL klastra do połączenia (np. Mesos: // host: port, spark: // host: port, local [4]).

  • appName - nazwa pracy, która będzie wyświetlana w interfejsie WWW klastra

  • batchDuration - przedział czasu, w którym dane przesyłane strumieniowo zostaną podzielone na partie

public StreamingContext(SparkConf conf, Duration batchDuration)

Utwórz StreamingContext, podając konfigurację niezbędną dla nowego SparkContext.

  • conf - Parametry iskry

  • batchDuration - przedział czasu, w którym dane przesyłane strumieniowo zostaną podzielone na partie

KafkaUtils API

Interfejs API KafkaUtils służy do łączenia klastra Kafka z przesyłaniem strumieniowym Spark. Ten interfejs API ma znaczącą metodę createStream, zdefiniowaną poniżej.

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Przedstawiona powyżej metoda służy do tworzenia strumienia wejściowego, który pobiera wiadomości od brokerów platformy Kafka.

  • ssc - Obiekt StreamingContext.

  • zkQuorum - Kworum Zookeeper.

  • groupId - identyfikator grupy dla tego konsumenta.

  • topics - zwróć mapę tematów do skonsumowania.

  • storageLevel - Poziom pamięci do przechowywania odebranych obiektów.

Interfejs API KafkaUtils ma inną metodę createDirectStream, która służy do tworzenia strumienia wejściowego, który bezpośrednio pobiera wiadomości od brokerów Kafka bez użycia żadnego odbiornika. Ten strumień może zagwarantować, że każda wiadomość od Kafki zostanie uwzględniona w przekształceniach dokładnie raz.

Przykładowa aplikacja jest wykonywana w Scali. Aby skompilować aplikację, pobierz i zainstaluj sbt , narzędzie do kompilacji scala (podobne do mavena). Poniżej przedstawiono główny kod aplikacji.

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Zbuduj skrypt

Integracja iskra-kafka zależy od iskry, przesyłania strumieniowego iskier i słoika integracji Spark Kafka. Utwórz nowy plik build.sbt i określ szczegóły aplikacji oraz jej zależności. SBT pobierze niezbędne słoik podczas kompilacji i pakowania aplikacji.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Kompilacja / Pakowanie

Uruchom następujące polecenie, aby skompilować i spakować plik jar aplikacji. Musimy przesłać plik jar do konsoli Spark, aby uruchomić aplikację.

sbt package

Przesyłanie do Spark

Uruchom interfejs wiersza polecenia Kafka Producer (wyjaśniono w poprzednim rozdziale), utwórz nowy temat o nazwie my-first-topic i podaj kilka przykładowych komunikatów, jak pokazano poniżej.

Another spark test message

Uruchom następujące polecenie, aby przesłać aplikację do konsoli Spark.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

Przykładowe dane wyjściowe tej aplikacji przedstawiono poniżej.

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

Przeanalizujmy aplikację działającą w czasie rzeczywistym, aby uzyskać najnowsze kanały z Twittera i jej hashtagi. Wcześniej widzieliśmy integrację Storm i Spark z Kafką. W obu scenariuszach stworzyliśmy Kafka Producer (używając cli) do wysyłania wiadomości do ekosystemu Kafka. Następnie integracja burzy i iskier odczytuje wiadomości za pomocą konsumenta Kafki i wprowadza je odpowiednio do ekosystemu burzy i iskier. Więc praktycznie musimy stworzyć Kafka Producer, który powinien -

  • Czytaj kanały z Twittera za pomocą „Twitter Streaming API”,
  • Przetwarzaj pasze,
  • Wyodrębnij HashTags i
  • Wyślij to do Kafki.

Po otrzymaniu HashTagów przez Kafkę integracja Storm / Spark otrzymuje informacje i wysyła je do ekosystemu Storm / Spark.

Twitter Streaming API

Dostęp do „Twitter Streaming API” można uzyskać w dowolnym języku programowania. „Twitter4j” to nieoficjalna biblioteka Java o otwartym kodzie źródłowym, która udostępnia oparty na Javie moduł umożliwiający łatwy dostęp do „Twitter Streaming API”. „Twitter4j” udostępnia strukturę opartą na odbiorniku umożliwiającą dostęp do tweetów. Aby uzyskać dostęp do „Twitter Streaming API”, musimy zalogować się na konto programisty Twittera i otrzymać następujące informacjeOAuth szczegóły uwierzytelniania.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Po utworzeniu konta programisty pobierz pliki jar „twitter4j” i umieść je w ścieżce klas Java.

Pełne kodowanie producenta na Twitterze Kafka (KafkaTwitterProducer.java) jest wymienione poniżej -

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Kompilacja

Skompiluj aplikację za pomocą następującego polecenia -

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Wykonanie

Otwórz dwie konsole. Uruchom powyższą skompilowaną aplikację, jak pokazano poniżej, w jednej konsoli.

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Uruchom dowolną aplikację Spark / Storm opisaną w poprzednim rozdziale w innym oknie. Należy przede wszystkim zauważyć, że zastosowany temat powinien być taki sam w obu przypadkach. Tutaj użyliśmy „mój-pierwszy-temat” jako nazwy tematu.

Wynik

Wynik tej aplikacji będzie zależał od słów kluczowych i aktualnego kanału na Twitterze. Przykładowe dane wyjściowe są określone poniżej (integracja z burzą).

. . .
food : 1
foodie : 2
burger : 1
. . .

Narzędzie Kafka spakowane pod adresem „org.apache.kafka.tools. *. Narzędzia są podzielone na narzędzia systemowe i narzędzia do replikacji.

Narzędzia systemowe

Narzędzia systemowe można uruchamiać z wiersza poleceń za pomocą skryptu klasy uruchamiania. Składnia jest następująca -

bin/kafka-run-class.sh package.class - - options

Niektóre z narzędzi systemowych są wymienione poniżej -

  • Kafka Migration Tool - To narzędzie służy do migracji brokera z jednej wersji do drugiej.

  • Mirror Maker - To narzędzie służy do tworzenia kopii lustrzanych jednego klastra Kafka w innym.

  • Consumer Offset Checker - To narzędzie wyświetla grupę konsumentów, temat, partycje, przesunięcie, rozmiar dziennika, właściciel dla określonego zestawu tematów i grupę konsumentów.

Narzędzie replikacji

Replikacja Kafki to narzędzie do projektowania wysokiego poziomu. Celem dodania narzędzia do replikacji jest większa trwałość i wyższa dostępność. Niektóre z narzędzi do replikacji są wymienione poniżej -

  • Create Topic Tool - Tworzy temat z domyślną liczbą partycji, współczynnikiem replikacji i używa domyślnego schematu Kafki do przypisywania repliki.

  • List Topic Tool- To narzędzie wyświetla informacje dla danej listy tematów. Jeśli w wierszu poleceń nie podano żadnych tematów, narzędzie wysyła zapytanie do Zookeepera, aby uzyskać wszystkie tematy i wyświetla informacje o nich. Pola wyświetlane przez narzędzie to nazwa tematu, partycja, lider, repliki, isr.

  • Add Partition Tool- Tworzenie tematu, należy określić liczbę podziałów na temat. Później może być potrzebnych więcej partycji dla tematu, gdy objętość tematu wzrośnie. To narzędzie pomaga dodawać więcej partycji dla określonego tematu, a także umożliwia ręczne przypisywanie dodanych partycji replik.

Kafka obsługuje wiele z najlepszych dzisiejszych aplikacji przemysłowych. W tym rozdziale przedstawimy bardzo krótki przegląd niektórych z najważniejszych zastosowań platformy Kafka.

Świergot

Twitter to serwis społecznościowy online, który zapewnia platformę do wysyłania i odbierania tweetów użytkowników. Zarejestrowani użytkownicy mogą czytać i wysyłać tweety, ale niezarejestrowani użytkownicy mogą czytać tylko tweety. Twitter wykorzystuje Storm-Kafka jako część infrastruktury przetwarzania strumieniowego.

LinkedIn

Apache Kafka jest używany na LinkedIn do danych strumienia aktywności i wskaźników operacyjnych. System mes-saging Kafka pomaga LinkedIn w różnych produktach, takich jak LinkedIn Newsfeed, LinkedIn Today do konsumpcji wiadomości online oraz oprócz systemów analitycznych offline, takich jak Hadoop. Silna trwałość Kafki jest również jednym z kluczowych czynników związanych z LinkedIn.

Netflix

Netflix to amerykański międzynarodowy dostawca strumieniowych multimediów internetowych na żądanie. Netflix używa platformy Kafka do monitorowania w czasie rzeczywistym i przetwarzania zdarzeń.

Mozilla

Mozilla to społeczność wolnego oprogramowania, utworzona w 1998 roku przez członków Netscape. Kafka wkrótce wymieni część obecnego systemu produkcyjnego Mozilli w celu zbierania danych o wydajności i użytkowaniu z przeglądarki użytkownika końcowego dla projektów takich jak telemetria, pilot testowy itp.

Wyrocznia

Oracle zapewnia natywną łączność z firmą Kafka z jej produktu Enterprise Service Bus o nazwie OSB (Oracle Service Bus), który umożliwia programistom wykorzystanie wbudowanych funkcji mediacji OSB do wdrażania etapowych potoków danych.