Apache Kafka - Основные операции
Сначала давайте приступим к реализации конфигурации с одним узлом и одним брокером,
а затем перенесем нашу настройку на конфигурацию с одним узлом и несколькими брокерами.
Надеюсь, вы уже установили Java, ZooKeeper и Kafka на свой компьютер. Прежде чем перейти к настройке кластера Kafka, сначала вам нужно будет запустить ZooKeeper, потому что кластер Kafka использует ZooKeeper.
Запустить ZooKeeper
Откройте новый терминал и введите следующую команду -
bin/zookeeper-server-start.sh config/zookeeper.properties
Чтобы запустить Kafka Broker, введите следующую команду -
bin/kafka-server-start.sh config/server.properties
После запуска Kafka Broker введите команду jps
в терминале ZooKeeper, и вы увидите следующий ответ:
821 QuorumPeerMain
928 Kafka
931 Jps
Теперь вы могли видеть два демона, работающих на терминале, где QuorumPeerMain - это демон ZooKeeper, а другой - демон Kafka.
Конфигурация с одним узлом и одним брокером
В этой конфигурации у вас есть один экземпляр ZooKeeper и идентификатор брокера. Ниже приведены шаги по его настройке -
Creating a Kafka Topic- Kafka предоставляет утилиту командной строки с именем kafka-topics.sh
для создания тем на сервере. Откройте новый терминал и введите приведенный ниже пример.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
Мы только что создали тему Hello-Kafka
с одним разделом и одним фактором реплики. Созданный выше вывод будет похож на следующий вывод -
Output- Создал тему Hello-Kafka
После создания темы вы можете получить уведомление в окне терминала брокера Kafka и журнал для созданной темы, указанный в «/ tmp / kafka-logs /» в файле config / server.properties.
Список тем
Чтобы получить список тем на сервере Kafka, вы можете использовать следующую команду -
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
Поскольку мы создали тему, в ней будет отображаться
только Hello-Kafka
. Предположим, что если вы создаете более одной темы, вы получите имена тем на выходе.
Запустите Producer для отправки сообщений
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
Из приведенного выше синтаксиса для клиента командной строки производителя требуются два основных параметра:
Broker-list- Список брокеров, которым мы хотим отправлять сообщения. В этом случае у нас только один брокер. Файл Config / server.properties содержит идентификатор порта брокера, поскольку мы знаем, что наш брокер прослушивает порт 9092, поэтому вы можете указать его напрямую.
Название темы - вот пример названия темы.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
Производитель будет ждать ввода от стандартного ввода и опубликовать в кластере Kafka. По умолчанию каждая новая строка публикуется как новое сообщение, тогда свойства производителя по умолчанию указываются в файле config /
Producer.properties. Теперь вы можете ввести несколько строк сообщений в терминал, как показано ниже.
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
Запустить потребителя для получения сообщений
Как и производитель, потребительские свойства по умолчанию указаны в файле config / consumer.proper-ties
. Откройте новый терминал и введите приведенный ниже синтаксис для получения сообщений.
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
Наконец, вы можете вводить сообщения с терминала производителя и видеть, как они появляются в терминале потребителя. На данный момент у вас есть очень хорошее представление о кластере с одним узлом и одним брокером. Теперь перейдем к настройке нескольких брокеров.
Конфигурация одного узла и нескольких брокеров
Прежде чем перейти к настройке кластера с несколькими брокерами, сначала запустите сервер ZooKeeper.
Create Multiple Kafka Brokers- У нас уже есть один экземпляр брокера Kafka в файле con-fig / server.properties. Теперь нам нужно несколько экземпляров брокера, поэтому скопируйте существующий файл server.prop-erties в два новых файла конфигурации и переименуйте его как server-one.properties и server-two.prop-erties. Затем отредактируйте оба новых файла и назначьте следующие изменения -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers- После внесения всех изменений на трех серверах откройте три новых терминала, чтобы запускать каждого брокера по очереди.
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
Теперь у нас на машине работают три разных брокера. Попробуйте сами проверить всех демонов, набравjps на терминале ZooKeeper, то вы увидите ответ.
Создание темы
Давайте назначим коэффициент репликации равным трем для этой темы, потому что у нас работает три разных брокера. Если у вас два брокера, то назначенное значение реплики будет равно двум.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
Команда Describe
используется для проверки того, какой брокер прослушивает текущую созданную тему, как показано ниже -
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
Из вышеприведенного вывода мы можем заключить, что первая строка дает сводку всех разделов, показывая имя темы, количество разделов и фактор репликации, который мы уже выбрали. Во второй строке каждый узел будет лидером для случайно выбранной части разделов.
В нашем случае мы видим, что наш первый брокер (с broker.id 0) является лидером. Затем Replicas: 0,2,1 означает, что все брокеры реплицируют тему, наконец, Isr
- это набор синхронизированных
реплик. Что ж, это подмножество реплик, которые на данный момент живы и догнали лидер.
Запустите Producer для отправки сообщений
Эта процедура остается такой же, как и при настройке с одним брокером.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
Запустить потребителя для получения сообщений
Эта процедура остается такой же, как показано в настройке с одним брокером.
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
Основные операции с темами
В этой главе мы обсудим различные основные операции с темами.
Изменение темы
Как вы уже поняли, как создать тему в Kafka Cluster. Теперь давайте изменим созданную тему, используя следующую команду
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
Удаление темы
Чтобы удалить тему, вы можете использовать следующий синтаксис.
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −Это не повлияет, если delete.topic.enable не установлено значение true