Apache Kafka

Apache Kafka, gerçek zamanlı veri işleme, depolama ve aktarımı için tasarlanmış açık kaynaklı bir dağıtık yayın-abone (publish-subscribe) mesajlaşma sistemi olarak öne çıkar. LinkedIn tarafından geliştirilmiş ve daha sonra Apache Software Foundation’a bağışlanmıştır. Büyük veri ve gerçek zamanlı analiz işlemleri için ideal olan Kafka, yüksek hacimli veri akışlarını etkili bir şekilde işleyebilir ve bu verileri disk üzerinde saklayabilir.
Docker ile çalıştıracağız.
docker-compose.yml için medium makalesini detaylı inceleyebilirsiniz.

version: “3”
services:
kafka_b:
image: docker.io/bitnami/kafka:3.4
hostname: kafka_b
ports:
– “9092:9092”
– “9094:9094”
volumes:
– “kafka_data:/bitnami”
environment:
– KAFKA_ENABLE_KRAFT=yes
– KAFKA_CFG_PROCESS_ROLES=broker,controller
– KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
– KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
– KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
– KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,EXTERNAL://kafka_b:9094
– KAFKA_BROKER_ID=1
[email protected]:9093
– ALLOW_PLAINTEXT_LISTENER=yes
– KAFKA_CFG_NODE_ID=1
– KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
– BITNAMI_DEBUG=yes
– KAFKA_CFG_NUM_PARTITIONS=2
volumes:
kafka_data:
driver: local

Topic

Topic Listeleme
kafka-topics.sh -bootstrap-server localhost:9092 list
-bootstrap-server hangi Kafka sunucusuna işlem yapılacağını seçiminde kullanılır.

Topic Create

TopicId = Bizde verebiliriz, vermezsek sistem otomatik olarak id üretiyor.
PartitionCount = Konfig dosyasında belirtilir. Bizde kaç istiyorsak tanımlayabiliriz.
ReplicationFactor = Partition olarak kaç kopya replik edilecek tanımlayabiliriz.

kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –create

Partition sayısını belirleyerek oluşturmak için.
kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –partitions 4 –create

Birden çok Kafka broker sunucusu olduğunda her bir partition kopyası başka broker sunucularında durur. Biz test sistemi kurduğumuz için çalışmayacaktır. her parititon yedeklilik için en az 2 kopyalı olsun istersek komutumuz:
kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –partitions 4 –replication-factor 2 –create

Topic Detayları

kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –describe
Leader partition replica işleminde hangi node’daki partition master onu belirler.

Topic Değiştirme

Topic create ettikten sonra değişiklik yapmak istersek –alter kullanıyoruz.
kafka-topics.sh -bootstrap-server localhost:9092 –alter –topic ilktopic –partitions 6

Kafka Produser Oluşturma

kafka-console-producer.sh -bootstrap-server localhost:9092 –topic ilktopic
> işareti sonrası produce işlemi yapabiliriz.
> 1. data
> 2. data
> 3. data

Kafka Consumer

kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic

Yukarıdaki gibi çalıştırdığımızda üstteki 3 veriyi göstermez. Bağlandığı andan itibaren gelenleri gösterir. Hepsini görmek istersek –from-beginning eklememiz gerekir.
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic –from-beginning

Kaç tane consumer çalıştığını görmek için kafka-consumer-groups.sh kullanırız.
kafka-consumer-groups.sh  -bootstrap-server localhost:9092 –list
console-consumer-33283
console-consumer-33283

Her çalıştırmamızda grup id üretir. Buda kaldığı yerden çalışmayı engeller. Consumer tekrar çalıştığında kaldığı yerden devam etmesi içn grup id vermemiz lazım.

kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic –group group08

kafka-consumer-groups.sh  -bootstrap-server localhost:9092 –list
group08

kafka-consumer-groups.sh  -bootstrap-server localhost:9092 –describe –group group08
LAG partition üzerinde işlenen en son mesajdan sonra kalan veri sayısını gösterir.

Burada random_text.txt dosyasını consumer kapatıp gönderdiğimizde LAG göreceğiz. LAG ya consumer kapalı yada producer tarafından gönderilen veriyi işleyemiyor diyebiliriz.

birden çok consumer açtığımızda parztitionların atanması işlemini görmek için ikinci bir consumer çalıştırıyoruz. Komutumuzun sonuna ampersand işareti ile arkada planda consumer çalışır.
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic –group group08 &
kafka-consumer-groups.sh  -bootstrap-server localhost:9092 –describe –group group08

Kafka Producer

Toplu bir veri produce etmek için bir txt içine random veri yükleyebilirsiniz.
kafka-console-producer.sh -bootstrap-server localhost:9092 –topic ilktopic < random_text.txt

Partition’lara nasıl veri geldiğiniz görebiliriz. peş peşe gönderilen verilere göre dağılımı görmek için; –time -1 bir önceki –time -2 ondan önceki, –time -3 daha önceki gibi gösterim sağlanabilir.
kafka-run-class.sh kafka.tools.GetOffsetShell -bootstrap-server localhost:9092 –topic ilktopic

ilktopic:0:1732
ilktopic:1:5146
ilktopic:2:5625
ilktopic:3:4148
ilktopic:4:4602
ilktopic:5:2301
ilktopic:6:5334
ilktopic:7:797

Key değeri vermeden Kafka verileri eşit dağıtmayı garanti etmez.

kafka-console-producer.sh –bootstrap-server localhost:9092 –topic product –property “parse.key=true” –property “key.separator=:”
>key1:vitamin
>key1:takviye
>key2:protein

kafka-run-class.sh kafka.tools.GetOffsetShell -bootstrap-server localhost:9092 –topic product

product:0:3
product:1:0
product:2:0
product:3:1

Kafka Producer Key Value

Producer ile Key, Value ile veri gönderebiliriz. Her key için gönderilen veri bir partition’a rasgele gider. Daha sonra gönderdiğimiz aynı key olan value ler aynı partition da toplanır.

kafka-console-producer.sh –bootstrap-server localhost:9092 –topic product –property “parse.key=true” –property “key.separator=:”

>key1:salih
>key3:deneme
>key4:krem
>key5:lolik
>key5:ikincikayit
>key9:ilkkayit
>key11:testilk

kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic product –from-beginning –property key.separator=”:” –property print.key=true –property print.value=true –property print.partition=true

Partition:3:key2:protein
Partition:1:key3:deneme
Partition:0:key1:vitamin
Partition:0:key1:takviye
Partition:0:key1:salih
Partition:1:key4:krem
Partition:3:key5:lolik
Partition:3:key5:ikincikayit
Partition:3:key9:ilkkayit
Partition:2:key11:testilk

Consumer Partition Veri İşlemesi

kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic product –from-beginning –property key.separator=”:” –property print.key=true –property print.value=true –property print.partition=true –partition 1 –offset 4 (5. kayıttan itibaren başlar)