Apache Kafka est une plateforme open-source de streaming de données développée par la fondation Apache. Conçue de base pour la gestion et la diffusion en temps réel de flux de données massifs, Kafka est utilisé pour la création de pipelines de données en temps réel, l'agrégation de données, la diffusion de messages, la surveillance d'événements et bien d'autres applications.
Kafka repose sur une architecture de journal distribué. Il traite les données sous forme de flux de messages organisés en "topics" (sujets). Chaque message est un enregistrement contenant une clé, une valeur et un horodatage. Les messages sont stockés dans des "logs" (journaux) et sont conservés pendant une période configurable.

Quelques notions

  • Producteur : Entité qui génère et envoie les messages vers le message broker. Ces messages peuvent être des notifications, des mises à jour, des événements, etc.
  • Consommateur : Entité qui reçoit et traite les messages.
  • Topic (ou sujet): Un topic est une catégorie de flux de messages. Les producteurs envoient des messages à des topics spécifiques, et les consommateurs lisent à partir de ces topics.
  • Broker : Un broker est un serveur Kafka qui stocke les données et les distribue aux producteurs et aux consommateurs. Kafka fonctionne généralement avec un cluster de brokers pour garantir la haute disponibilité et la tolérance aux pannes.

Prérequis

  • Java : Kafka est construit en Java, donc assurez-vous d'avoir Java (à partir de la version 8.0.0) installé sur votre système.
  • Zookeeper : Kafka utilise ZooKeeper pour la gestion de cluster. Bien qu'il soit possible de démarrer un cluster Kafka sans ZooKeeper, il est recommandé de l'inclure pour la gestion des méta-informations du cluster.

Téléchargement de Kafka

Rendez vous sur le site officiel d'Apache Kafka et téléchargez la dernière version de Kafka. 

Une fois ceci fait, extrayez l'archive dans le dossier souhaité.

Configuration de Kafka

Rendez vous dans le dossier Kafka et explorez les fichiers de configuration sous le répertoire Config.

Éditez le fichier server.properties pour modifier le chemin des logs de l'application. 

# A comma separated list of directories under which to store log files
log.dirs=<your_folder_path>/kafka_2.13-3.5.0/kafka-logs 

Éditez le fichier zookeeper.properties pour modifier le chemin du stockage de la data de Zookeeper. 

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=<your_folder_path>/kafka_2.13-3.5.0/zookeeper-data

Démarrage de ZooKeeper

Si vous utilisez ZooKeeper, démarrez le en exécutant la commande dans un terminal, à partir du dossier Kafka :

./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties

Si cette commande ne fonctionne pas, essayez d’ouvrir un terminal linux, bash par exemple. Puis exécutez cette commande :

./bin/zookeeper-server-start.sh config/zookeeper.properties

Démarrage du broker Kafka

Pour démarrer un broker Kafka, exécutez la commande suivante dans un terminal, à partir du dossier Kafka :

./bin/windows/kafka-server-start.sh ./config/server.properties

Si cette commande ne fonctionne pas, essayez d’ouvrir un terminal linux, bash par exemple. Puis exécutez cette commande :

./bin/kafka-server-start.sh config/server.properties

Une fois le serveur lancé, nous pouvons créer un système de producteur / consommateur en ligne de commande très rapidement. 

Dans le dossier <your_folder_path>\bin\windows, on peut trouver les fichiers kafka-console-consumer.bat et kafka-console-producer.bat

Ces fichiers nous serviront plus tard. Avant, nous avons besoin de créer des topics.

Pour créer un topic, il suffit de se placer dans le dossier <your_folder_path>\bin\windows et d'exécuter la commande suivante :

./kafka-topics.bat --create --bootstrap-server localhost:9092 --topic my-topic

Production et consommation de messages

Pour la production de messages, il faudra “ouvrir” le topic avec cette commande :

./kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic

Une fois cette commande exécutée, la console “attendra” des messages, mais pour l’instant, attendons que le consommateur puisse recevoir les messages.

Pour ce faire, il faudra effectuer cette commande :

./kafka-console-consumer.bat --topic my_topic --bootstrap-server localhost:9092

À partir de maintenant, le consommateur sera en attente de réception des messages du producteur. Maintenant, si on envoie un message du côté producteur, il s’affichera en direct côté consommateur.

Producteur du message

Le producteur peut bien envoyer des messages via la console. 

Installation de Kafka & développement de producteur et consommateur

Consommateur du message

Le consommateur de message, s’il se met sur le bon “topic”, reçoit bien les messages du producteur.

Installation de Kafka & développement de producteur et consommateur

Installation avec Docker

Pour lancer Kafka et Zookeeper facilement avec Docker vous n’avez besoin que d’un seul fichier. Un docker-compose.yml va contenir l’image de zookeeper ainsi que celle de kafka, mais aussi les ports utilisés et le lien entre les deux services.

Pour la partie Zookeeper, on va définir l’image que l’on va utiliser, le nom du conteneur, les ports ainsi que le “networks” pour la persistance des données.

zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-net

Idem pour la partie Kafka avec en plus, la partie environnement pour y définir les variables ainsi que la définition de son conteneur de dépendance.

kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
networks:
- kafka-net

Voici le fichier complet : 

version: '3'

services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
networks:
- kafka-net
networks:
kafka-net:
driver: bridge

Pour lancer le serveur Kafka :

docker-compose up -d

Développement de producteur de message et de consommateur

Le producteur 

Pour cette partie, un peu de code sera nécessaire. On va donc commencer par la partie producteur de message. Avant de commencer, on va avoir besoin d’un module python, confluent-kafka.

pip install confluent-kafka

Ensuite, on va créer un fichier producer.py, on va initialiser la connexion avec le broker kafka et créer le sujet dans lequel on enverra le message.

from confluent_kafka import Producer

# Configuration du broker Kafka
bootstrap_servers = "localhost:9092"
topic = "my-topic"

Ensuite on va créer une instance de producteur et envoyer le maessage dans cette instance. 

# Création d'une instance de producteur Kafka
producer = Producer({"bootstrap.servers": bootstrap_servers})

# Envoi d'un message au topic Kafka
message = "Hello World!"
producer.produce(topic, value=message)

Pour finir, on va simplement attendre que le message s'envoie et affiche un message de confirmation quand il est envoyé dans le topic. 

# Attendre que le message soit envoyé
producer.flush()

# Afficher un message de confirmation quand le message est envoyé
print("Message sent to Kafka topic.✅")

Pour envoyer un message : python ./producer.py

Installation de Kafka & développement de producteur et consommateur

Le consommateur de message

Pour cette partie, on doit aussi configurer le broker Kafka, mais cette fois-ci avec le sujet sur lequel on va recevoir les messages. On va créer un fichier consumer.py

# Configuration du broker Kafka
bootstrap_servers = "localhost:9092"
topic = "my-topic"
group_id = "test-group"

Puis il nous faut créer une instance de consommateur pour Kafka, et s'abonner au sujet en question.

# Création d'une instance de consommateur Kafka
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest"
})

# S'abonner au topic Kafka
consumer.subscribe([topic])

On va finir par lire la liste de ce topic, toutes les secondes.

# Lire les messages du topic Kafka
while True:
msg = consumer.poll(1.0) # timeout de 1 seconde

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition")
else:
print("Error: {}".format(msg.error()))
else:
print("Received message: {}".format(msg.value().decode("utf-8")))

Pour recevoir les messages : python ./consumer.py

Installation de Kafka & développement de producteur et consommateur

Fichiers complets :

producer.py :

from confluent_kafka import Producer

# Configuration du broker Kafka
bootstrap_servers = "localhost:9092"
topic = "my-topic"

# Création d'une instance de producteur Kafka
producer = Producer({"bootstrap.servers": bootstrap_servers})

# Envoi d'un message au topic Kafka
message = "Hello !"
producer.produce(topic, value=message)

# Attendre que le message soit envoyé
producer.flush()

# Afficher un message de confirmation quand le message est envoyé
print("Message sent to Kafka topic.✅")

consumer.py :

from confluent_kafka import Consumer, KafkaError

# Configuration du broker Kafka
bootstrap_servers = "localhost:9092"
topic = "my-topic"
group_id = "test-group"

# Création d'une instance de consommateur Kafka
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest"
})

# S'abonner au topic Kafka
consumer.subscribe([topic])

# Lire les messages du topic Kafka
while True:
msg = consumer.poll(1.0) # timeout de 1 seconde

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition")
else:
print("Error: {}".format(msg.error()))
else:
print("Received message: {}".format(msg.value().decode("utf-8")))

On espère que cet article vous aura été utile ! Suivez notre wiki pour d'autres astuces et tutos !

Nos développeurs Next Decision ont les compétences techniques et les environnements de travail parfaits pour répondre à vos besoins, alors pas une seconde à perdre, Contactez-nous !