Apache Kafka
Apache Kafka, gerçek zamanlı veri işleme, depolama ve aktarımı için tasarlanmış açık kaynaklı bir dağıtık yayın-abone (publish-subscribe) mesajlaşma sistemi olarak öne çıkar. LinkedIn tarafından geliştirilmiş ve daha sonra Apache Software Foundation’a bağışlanmıştır. Büyük veri ve gerçek zamanlı analiz işlemleri için ideal olan Kafka, yüksek hacimli veri akışlarını etkili bir şekilde işleyebilir ve bu verileri disk üzerinde saklayabilir.
Docker ile çalıştıracağız.
docker-compose.yml için medium makalesini detaylı inceleyebilirsiniz.
version: “3”
services:
kafka_b:
image: docker.io/bitnami/kafka:3.4
hostname: kafka_b
ports:
– “9092:9092”
– “9094:9094”
volumes:
– “kafka_data:/bitnami”
environment:
– KAFKA_ENABLE_KRAFT=yes
– KAFKA_CFG_PROCESS_ROLES=broker,controller
– KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
– KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
– KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
– KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,EXTERNAL://kafka_b:9094
– KAFKA_BROKER_ID=1
– [email protected]:9093
– ALLOW_PLAINTEXT_LISTENER=yes
– KAFKA_CFG_NODE_ID=1
– KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
– BITNAMI_DEBUG=yes
– KAFKA_CFG_NUM_PARTITIONS=2
volumes:
kafka_data:
driver: local
Topic
Topic Listeleme
kafka-topics.sh -bootstrap-server localhost:9092 list
-bootstrap-server hangi Kafka sunucusuna işlem yapılacağını seçiminde kullanılır.
Topic Create
TopicId = Bizde verebiliriz, vermezsek sistem otomatik olarak id üretiyor.
PartitionCount = Konfig dosyasında belirtilir. Bizde kaç istiyorsak tanımlayabiliriz.
ReplicationFactor = Partition olarak kaç kopya replik edilecek tanımlayabiliriz.
kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –create
Partition sayısını belirleyerek oluşturmak için.
kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –partitions 4 –create
Birden çok Kafka broker sunucusu olduğunda her bir partition kopyası başka broker sunucularında durur. Biz test sistemi kurduğumuz için çalışmayacaktır. her parititon yedeklilik için en az 2 kopyalı olsun istersek komutumuz:
kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –partitions 4 –replication-factor 2 –create
Topic Detayları
kafka-topics.sh -bootstrap-server localhost:9092 –topic ilktopic –describe
Leader partition replica işleminde hangi node’daki partition master onu belirler.
Topic Değiştirme
Topic create ettikten sonra değişiklik yapmak istersek –alter kullanıyoruz.
kafka-topics.sh -bootstrap-server localhost:9092 –alter –topic ilktopic –partitions 6
Kafka Produser Oluşturma
kafka-console-producer.sh -bootstrap-server localhost:9092 –topic ilktopic
> işareti sonrası produce işlemi yapabiliriz.
> 1. data
> 2. data
> 3. data
Kafka Consumer
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic
Yukarıdaki gibi çalıştırdığımızda üstteki 3 veriyi göstermez. Bağlandığı andan itibaren gelenleri gösterir. Hepsini görmek istersek –from-beginning eklememiz gerekir.
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic –from-beginning
Kaç tane consumer çalıştığını görmek için kafka-consumer-groups.sh kullanırız.
kafka-consumer-groups.sh -bootstrap-server localhost:9092 –list
console-consumer-33283
console-consumer-33283
Her çalıştırmamızda grup id üretir. Buda kaldığı yerden çalışmayı engeller. Consumer tekrar çalıştığında kaldığı yerden devam etmesi içn grup id vermemiz lazım.
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic –group group08
kafka-consumer-groups.sh -bootstrap-server localhost:9092 –list
group08
kafka-consumer-groups.sh -bootstrap-server localhost:9092 –describe –group group08
LAG partition üzerinde işlenen en son mesajdan sonra kalan veri sayısını gösterir.
Burada random_text.txt dosyasını consumer kapatıp gönderdiğimizde LAG göreceğiz. LAG ya consumer kapalı yada producer tarafından gönderilen veriyi işleyemiyor diyebiliriz.
birden çok consumer açtığımızda parztitionların atanması işlemini görmek için ikinci bir consumer çalıştırıyoruz. Komutumuzun sonuna ampersand işareti ile arkada planda consumer çalışır.
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic ilktopic –group group08 &
kafka-consumer-groups.sh -bootstrap-server localhost:9092 –describe –group group08
Kafka Producer
Toplu bir veri produce etmek için bir txt içine random veri yükleyebilirsiniz.
kafka-console-producer.sh -bootstrap-server localhost:9092 –topic ilktopic < random_text.txt
Partition’lara nasıl veri geldiğiniz görebiliriz. peş peşe gönderilen verilere göre dağılımı görmek için; –time -1 bir önceki –time -2 ondan önceki, –time -3 daha önceki gibi gösterim sağlanabilir.
kafka-run-class.sh kafka.tools.GetOffsetShell -bootstrap-server localhost:9092 –topic ilktopic
ilktopic:0:1732
ilktopic:1:5146
ilktopic:2:5625
ilktopic:3:4148
ilktopic:4:4602
ilktopic:5:2301
ilktopic:6:5334
ilktopic:7:797
Key değeri vermeden Kafka verileri eşit dağıtmayı garanti etmez.
kafka-console-producer.sh –bootstrap-server localhost:9092 –topic product –property “parse.key=true” –property “key.separator=:”
>key1:vitamin
>key1:takviye
>key2:protein
kafka-run-class.sh kafka.tools.GetOffsetShell -bootstrap-server localhost:9092 –topic product
product:0:3
product:1:0
product:2:0
product:3:1
Kafka Producer Key Value
Producer ile Key, Value ile veri gönderebiliriz. Her key için gönderilen veri bir partition’a rasgele gider. Daha sonra gönderdiğimiz aynı key olan value ler aynı partition da toplanır.
kafka-console-producer.sh –bootstrap-server localhost:9092 –topic product –property “parse.key=true” –property “key.separator=:”
>key1:salih
>key3:deneme
>key4:krem
>key5:lolik
>key5:ikincikayit
>key9:ilkkayit
>key11:testilk
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic product –from-beginning –property key.separator=”:” –property print.key=true –property print.value=true –property print.partition=true
Partition:3:key2:protein
Partition:1:key3:deneme
Partition:0:key1:vitamin
Partition:0:key1:takviye
Partition:0:key1:salih
Partition:1:key4:krem
Partition:3:key5:lolik
Partition:3:key5:ikincikayit
Partition:3:key9:ilkkayit
Partition:2:key11:testilk
Consumer Partition Veri İşlemesi
kafka-console-consumer.sh -bootstrap-server localhost:9092 –topic product –from-beginning –property key.separator=”:” –property print.key=true –property print.value=true –property print.partition=true –partition 1 –offset 4 (5. kayıttan itibaren başlar)