Apache Kafka - Основные операции

Сначала давайте приступим к реализации конфигурации с одним узлом и одним брокером, а затем перенесем нашу настройку на конфигурацию с одним узлом и несколькими брокерами.

Надеюсь, вы уже установили Java, ZooKeeper и Kafka на свой компьютер. Прежде чем перейти к настройке кластера Kafka, сначала вам нужно будет запустить ZooKeeper, потому что кластер Kafka использует ZooKeeper.

Запустить ZooKeeper

Откройте новый терминал и введите следующую команду -

bin/zookeeper-server-start.sh config/zookeeper.properties

Чтобы запустить Kafka Broker, введите следующую команду -

bin/kafka-server-start.sh config/server.properties

После запуска Kafka Broker введите команду jps в терминале ZooKeeper, и вы увидите следующий ответ:

821 QuorumPeerMain
928 Kafka
931 Jps

Теперь вы могли видеть два демона, работающих на терминале, где QuorumPeerMain - это демон ZooKeeper, а другой - демон Kafka.

Конфигурация с одним узлом и одним брокером

В этой конфигурации у вас есть один экземпляр ZooKeeper и идентификатор брокера. Ниже приведены шаги по его настройке -

Creating a Kafka Topic- Kafka предоставляет утилиту командной строки с именем kafka-topics.sh для создания тем на сервере. Откройте новый терминал и введите приведенный ниже пример.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

Мы только что создали тему Hello-Kafka с одним разделом и одним фактором реплики. Созданный выше вывод будет похож на следующий вывод -

Output- Создал тему Hello-Kafka

После создания темы вы можете получить уведомление в окне терминала брокера Kafka и журнал для созданной темы, указанный в «/ tmp / kafka-logs /» в файле config / server.properties.

Список тем

Чтобы получить список тем на сервере Kafka, вы можете использовать следующую команду -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

Поскольку мы создали тему, в ней будет отображаться только Hello-Kafka . Предположим, что если вы создаете более одной темы, вы получите имена тем на выходе.

Запустите Producer для отправки сообщений

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

Из приведенного выше синтаксиса для клиента командной строки производителя требуются два основных параметра:

Broker-list- Список брокеров, которым мы хотим отправлять сообщения. В этом случае у нас только один брокер. Файл Config / server.properties содержит идентификатор порта брокера, поскольку мы знаем, что наш брокер прослушивает порт 9092, поэтому вы можете указать его напрямую.

Название темы - вот пример названия темы.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Производитель будет ждать ввода от стандартного ввода и опубликовать в кластере Kafka. По умолчанию каждая новая строка публикуется как новое сообщение, тогда свойства производителя по умолчанию указываются в файле config / Producer.properties. Теперь вы можете ввести несколько строк сообщений в терминал, как показано ниже.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Запустить потребителя для получения сообщений

Как и производитель, потребительские свойства по умолчанию указаны в файле config / consumer.proper-ties . Откройте новый терминал и введите приведенный ниже синтаксис для получения сообщений.

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

Наконец, вы можете вводить сообщения с терминала производителя и видеть, как они появляются в терминале потребителя. На данный момент у вас есть очень хорошее представление о кластере с одним узлом и одним брокером. Теперь перейдем к настройке нескольких брокеров.

Конфигурация одного узла и нескольких брокеров

Прежде чем перейти к настройке кластера с несколькими брокерами, сначала запустите сервер ZooKeeper.

Create Multiple Kafka Brokers- У нас уже есть один экземпляр брокера Kafka в файле con-fig / server.properties. Теперь нам нужно несколько экземпляров брокера, поэтому скопируйте существующий файл server.prop-erties в два новых файла конфигурации и переименуйте его как server-one.properties и server-two.prop-erties. Затем отредактируйте оба новых файла и назначьте следующие изменения -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- После внесения всех изменений на трех серверах откройте три новых терминала, чтобы запускать каждого брокера по очереди.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Теперь у нас на машине работают три разных брокера. Попробуйте сами проверить всех демонов, набравjps на терминале ZooKeeper, то вы увидите ответ.

Создание темы

Давайте назначим коэффициент репликации равным трем для этой темы, потому что у нас работает три разных брокера. Если у вас два брокера, то назначенное значение реплики будет равно двум.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

Команда Describe используется для проверки того, какой брокер прослушивает текущую созданную тему, как показано ниже -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

Из вышеприведенного вывода мы можем заключить, что первая строка дает сводку всех разделов, показывая имя темы, количество разделов и фактор репликации, который мы уже выбрали. Во второй строке каждый узел будет лидером для случайно выбранной части разделов.

В нашем случае мы видим, что наш первый брокер (с broker.id 0) является лидером. Затем Replicas: 0,2,1 означает, что все брокеры реплицируют тему, наконец, Isr - это набор синхронизированных реплик. Что ж, это подмножество реплик, которые на данный момент живы и догнали лидер.

Запустите Producer для отправки сообщений

Эта процедура остается такой же, как и при настройке с одним брокером.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Запустить потребителя для получения сообщений

Эта процедура остается такой же, как показано в настройке с одним брокером.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Основные операции с темами

В этой главе мы обсудим различные основные операции с темами.

Изменение темы

Как вы уже поняли, как создать тему в Kafka Cluster. Теперь давайте изменим созданную тему, используя следующую команду

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Удаление темы

Чтобы удалить тему, вы можете использовать следующий синтаксис.

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −Это не повлияет, если delete.topic.enable не установлено значение true