Muhammed Fatih Doğmuş
Proje Teknik Lideri

Spring ile Kafka: Mesaj alma ve Verme

Spring ile Kafka: Mesaj alma ve Verme

Mikroservis mimarisinin yaygınlaşması ile beraber, asenkron iletişime olan ihtiyaç çok yüksek seviyelere ulaştı. Mesaj kuyrukları, bu asenkron iletişimi karşılamak için yegane çözümler olarak karşımıza çıkmaktadır. Günümüzde, çok fazla alternatif bulunmakta. Güvenilir, ölçeklenebilir, kendini kanıtlamış, iyi desteği olan mesaj kuyruğu bulmak ise çok kolay değil.

Kafka, karşımıza her türlü ölçekte mesajların iletilebilmesini sağlayan iyi bir alternatif olarak çıkmakta. Yıllarca production ortamında kendini kanıtlamış mimarisi, güçlü ekosistemi, açık kaynak doğasıyla açık ara en çok kullanılan mesaj kuyruğu oluyor.

Bu serimizde, sizlerle spring ile kafka entegrasyonunun nasıl yapılabileceğinden bahsedeceğim. 2.5 aylık kafka alt yapısı hazırlama serüvenimden sonra, spring ve kafka ile ilgili öğrendiğim ve yaparken çok zorlandığım, fazla kaynak bulamadığım konuları, sizlerle paylaşarak, benim çektiğim acıları çekmemenizi sağlamaya çalışacağım 🙂

Serimizin ilk yazısı olan bu yazıda, sizlere temel düzeyde kafka’nın temel konseptleri nelerdir, en temel şekilde bir spring uygulamasından kafka’ya nasıl mesaj yollarız, ve bu mesajları nasıl okuruzdan bahsedeceğim.

Kafka Konseptleri

İlk başta kafka’nın temel konseptlerinden başlayalım.

Broker

Broker, bir kafka instance’ıdır. Doğası gereği kafka, rahat ölçeklenebilir bir yapı ile tasarlanmıştır. Bu sebeple, bir kafka cluster’ında birden fazla kafka instance’ı, yani broker, olabilir. Bir kafka cluster’ına mesaj yazma veya okuma için bağlanacağımız vakitte, bu broker’ların adreslerini vererek bağlanırız.

Topic

Topic, kafka’nın gelen mesajları bölümleme yöntemidir, tam olmasa da veri tabanı tablolarına benzetebiliriz. Her topic’in bir adı vardır ve string ile temsil edilirler ve unique olmalıdırlar. Bir mesaj, her zaman bir topic’e gönderilir ve okuyacak kişi de mesajları belirttiği topic’ten okuması gerekir. Burada topic’lerin kullanım amacı, farklı türdeki event’lerin ayrılştırılması içindir. Her ne kadar farklı türdeki event’ler aynı topic’e basılabilse de, genellikle bir topic’e bir tür event basılması daha iyi bir pratiktir.

Partition

Partition, topic’lerin bölünerek ölçeklenebilmesini sağlar. Veri tabanındaki sharding mantığına benzer. Bir topic, bir veya birden fazla partition’dan oluşur. Her partition, bir sayı ile temsil edilir, ve topic oluşturulurken, topic’in kaç tane partition’a sahip olması gerektiğini söyleriz, mesela 15 gibi. Bu partition’lar, kafka cluster’ı içindeki broker’lara dağıtılırlar. Her broker, bir topic’teki belli bir partition aralığından sorumlu kılınır. Bir mesaj publish ettiğinizde, bu mesaj, ya elle geliştirici tarafından, ya da kafka tarafından otomatik olarak bir partition’a atanır. Partition belirlendikten sonra, bu mesaj, o partition’dan sorumlu olan broker’a atanır. Mesajın okunması gerektiği zaman da, okuması gereken taraf, ya topic’teki tüm partition’ları okur, veya daha ölçeklenebilir bir yapı istenirse, her bir consumer belli bir partition aralığından sorumlu olabilir.

Bir mesajın hangi partition’a gitmesi gerktiği iki farklı şekilde belirlenir: Ya siz mesajı publish ederken gitmesi gereken partition’ı elle verirsiniz, ya da bir partition key belirlersiniz. Eğer partition key verirseniz, kafka, partition key’in hash’ini alarak, bir partition’a gönderir. Böylece partition key’i aynı olan mesajlar, aynı partition’a gider. Bu bize, sıra ile işlenmesi gereken event’lerimiz varsa, bunların sıra ile işlenebilmesine olanak sağlar.

Partition’lar kafka’nın ölçeklenebilmesi için en önemli kısımlardır. Eğer bir topic’e çok fazla mesaj gideceğini biliyorsanız, bu topic’in partition’lanması önemli bir hale gelmekte. Fakat bu bilginin önceden bilinmesi çok mümkün olamayabiliyor. Bu sebeple kafka, bize topic oluşturulduktan sonra partition’ların değiştirilmesine olanak sağlıyor. Bu sayede ileride oluşabilecek ölçeklenebilme problemlerini çözmüş oluruz.

Offset

Offset, bir mesajın, bir partition içerisindeki konumunu belirtir. Bir partition’da birden fazla mesaj olabileceği için, her mesaj, bir offset’e sahiptir. Arka planda mesajlar okunduğunda, bu offset değerine göre okunur.

Acknowledgement

Consumer, kafka’dan bir mesajı okuduğunda, kafka’ya bir onay mesajı göndermesi gerekir. Buna da acknowledgement denir, veya kısaca ack. Bir mesaj için ack yollandığında, kafka o mesajı bir daha yollamaz. Fakat eğer belirtilen zamanda ack yollanmaz ise, örneğin mesaj iletilememiş olabilir, veya mesaj işlenirken bir sorun olmuş olabilir, o zaman kafka, mesajı tekrar gönderir.

Producer

Producer, bir client aracılığı ile kafka’ya mesaj iletilmesini yapan yapıdır. Producer’lar, bir broker adresleri üzerinden kafka’ya bağlanıp, mesajlarını belli formatlarda kafka’ya iletirler.

Consumer

Consumer, bir topic’e publish edilen mesajları okuyan yapıdır. Yine broker adresleri üzerinden kafka’ya bağlanır, ve bir topic’teki mesajları dinlemeye başlar.

Spring Entegrasyonu

Temel konseptlerden sonra, şimdi spring ile kafkaya nasıl bağlanıp ayarlarımızı yapabileceğimizi, mesaj gönderme ve alma işlemlerini nasıl yapabileceğimize bakalım

Bağlantı bilgileri

İlk başta projemizi oluşturalım. https://start.sprint.io adresine girip burada aşağıdaki bir ayarda spring projemizi oluşturuyoruz:

Kısaca eklediğimiz deepdency’leri anlatayım:

  • Spring web, bildiğiniz web uygulamaları yazmak için web MVC modülünü ekliyor
  • PostgreSQL driver, postgres’e bağlanabilmek için gerekli driver’ları getiriyor
  • Spring data JPA, veri tabanına erişimleri rahat yapmak için
  • Lombok, java’yı adam aden bir kütüphane 🙂 Getter/setter oluşturma, unchecked exception’ları kolayca checked exception’lara dönüştürme, toString ve hashCode gibi fonksiyonların otomatik yazılması gibi pek çok özelliği barındırıyor
  • Spring for Apache Kafka, hem kafka’ya bağlanmak için gerekli native kütüphanelerin çekilmesini, hem de spring’in kafka entegrasyonu ile ilgili gerekli modülleri barındırıyor

Bu sayfada generate diyip çıkan zip’in içerisindeki projemizi IDE’mizde açıyoruz.

Daha sonra, bağlanabilecek bir kafka instance’ına ihtiyacımı var. Bunun için genellikle bir docker compose dosyası yazılarak doğrudan bir container kaldırılması iyi oluyor. Ayrıca ileride bir veri tabanına ihtiyacımız olacağı için PostgreSQL’i de kaldırıyoruz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
version: "3"

services:
  postgres:
    image: postgres:16.0-alpine
    ports:
      - "5432:5432"
    volumes:
      - ~/DockerVolumes/postgres:/bitnami/postgresql
    environment:
      - POSTGRESQL_TIMEZONE=Europe/Istanbul
      - POSTGRES_INITDB_ARGS=--locale-provider=icu --icu-locale=tr-TR
    restart: always
  kafka:
    image: bitnami/kafka:3.5.1
    ports:
      - "9092:9092"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    restart: always
  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.1
    ports:
      - "8089:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9094
    restart: always
    depends_on:
      - kafka

Burada ayarlar karmaşık görünebilir ama standart ayarlardır. Burada hem kafkayı ayağa kaldırıyoruz, hem de kafka’ya erişmek için kullandığımı kafka-ui denen bir tool bulunmakta, bununla topic’leri, partition’ları, queue’daki mesajları görebiliyoruz. docker compose up -d dediğimizde PostgreSQL, kafka ve kafka-ui otomatik olarak ayağa kaldırılacaktır.

Ayrıca dikkat ettiyseniz, kafka’yı zookeeper değil, kraft protokolü ile kullanıyoruz. Geçmişte leader election ve diğer işlevler için kafka ile zookeeper’ı kullanmak zorunda olsa da, artık native bir şekilde bunu raft protokolü üzerinden yapabildiği için, zookeeper’a gerek kalmıyor.

Daha sonra uygulamamızdan veri tabanına ve kafka’ya bağlanmak için gerekli ayarları yapmamız gerekiyor. Spring boot, çok kolay bir şekilde bu ayarlarımızı properties dosyasından vermemizi sağlıyor.

1
2
3
4
5
6
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.datasource.username=postgres
spring.datasource.password=q
spring.kafka.bootstrap-servers=http://localhost:9092
spring.kafka.consumer.group-id=kafka-blog
spring.kafka.consumer.enable-auto-commit=true

İlk üç satır veri tabanına bağlantı için gerekli bilgi iken, sonraki iki satır kafka için gerekli bağlantı bilgimiz. Burada birden fazla bootstrap server verebiliyoruz. Kafka, doğası gereği distributed mimaride çalıştığı için, bağlanmak istediğimiz instance’ların adreslerini buraya virgülle ayırarak verdiğimizde, spring otomatik olarak bu instance’lara bağlanacaktır.

Kafka ile ilgili diğer ayarlarımızda, kullanıcı adı, şifre, maksimum acknowledgement zamanı gibi ayarları da, spring üzerinden rahatlıkla yapabiliriz. Yapabileceğiniz tüm ayarlar için bu sayfada spring.kafka diye aratarak bulabilirsiniz.

Topic oluşturma

Bağlantımızı yaptıktan sonra, mesaj gönderip alacağımız topic’lere ihtiyacımız var. Bunu da yine spring üzerinden kolaylıkla sağlayabiliriz. NewTopic tipinde bir bean oluşturduğumuzda, kafka ile iletişime geçilerek, verilen topic’ler spring tarafında kafka’da oluşturuluyor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class TopicConfiguration {

    @Bean
    NewTopic cartTopic() {
        return TopicBuilder.name("cart")
                .partitions(10)
                .build();
    }
    
    @Bean
    NewTopic userTopic() {
        return TopicBuilder.name("user")
                .partitions(5)
                .replicas(1)
                .build();
    }
}

TopicBuilder, spring’in bize sunduğu ve topic’lerimizi oluşturmamız kolaylaştıran bir builder class’ı. Burada topic’lerimizin ismini, kaç partition olması gerektiğini, kaç replica’sının olması gerektiği gibi ayarları verebiliyoruz. Bu oluşturulan bean’leri alıp, spring bunlarlar ilgili gerekli topic’leri kafka’da oluşturuyor.

Mesaj gönderme ve alma

Sonraki adımımız ise artık mesajları gönderme ve alma olacak. İlk başta arka planda ne olduğunu anlatabilmek için low level kafka apilerini anlatacağım.. Daha sonra spring’in sunduğu daha üst seviye API’ları anlatacağım, ki spring’in kıymetini bilelim 🙂

Native kafka API’ları ile gönderme

İlk başta kafka’nın Producer ve Consumer API’ları üzerinden gönderme ve almayı anlatacağım. Bu API’lar, spring ile doğrudan alakası olmayıp, direkt kafka java client’ı üzerinden geliyor.

Fakat bunları oluşturması normalde çok zahmetli iken, bağlantı ayarlarımızı spring üzerinden yaptığımız için bunların birer instance’ını almak zor olmuyor.

İlk başta, göndereceğmiz event’i oluşturmamız gerekiyor. Burada java record’ları kullanmak son derece kolay oluyor, zira default olarak kafka event’lerimizi string formatında bekliyor. Bu sebeple jackson ile rahatça objeleri string’e dönüştürebilmek için record’ların kullanılması faydalı oluyor.

Deneme için oluşturduğumuz cart topic’ine bir event gönderelim:

1
2
public record CartEvent(String id, String itemId, Double price, Long count) {
}

Bu event’i göndermek için gerekli olan kod aşağıdaki gibidir:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
@RequiredArgsConstructor
public class NativeKafkaProducer {

    private final ProducerFactory<String, String> producerFactory;
    private final ObjectMapper objectMapper;

    @SneakyThrows
    public void sendMessage() {
        Producer<String, String> producer = producerFactory.createProducer();
        CartEvent cartEvent = new CartEvent(UUID.randomUUID().toString(), "item_id_1", 250D, 3L);
        producer.send(new ProducerRecord<>("cart", objectMapper.writeValueAsString(cartEvent)));
    }
}

Aşama aşama neler olduğuna bakalım:

İlk başta, bu event’i kafka’ya publish edebilmek için ProducerFactory instance’ını autowire etmemiz gerekiyor. Bu factory, spring’in sağlamış olduğu, native kafka Producer’larını oluşturmamıza yardım olmak için sunduğu yardımcı bir factory class’ıdır. Normalde consumer ve producer’ları oluşturmak için her seferinde bağlantı bilgilerini elle vermemiz gerekiyor. Fakat biz bağlantı detaylarını zaten spring’e application.properties dosyasından geçtiğimiz için, spring’in sağlamış olduğu bu class’ı kullanarak rahatlıkla producer’larımızı oluşturabiliriz. Bu sayede oluşturularan producer’lar, bağlantı bilgilerini ve producer/consumer ile ilgili diğer ayarları, spring üzerinden alıyor, ve biz de elle bu bilgileri girme zahmetinden kurtuluyoruz.

Aynı zamanda bir jackson ObjectMapper class’ına ihtiyacımız var. Zira default olarak kafka, göndereceği mesajları string formatında bekliyor. Göndereceğimiz event objesini object mapper ile bir JSON string’ine dönüştürdüğümüzde, mesajı consume ettiğimiz yerde yine aynı şekilde bu JSON string’inden event objemizi geri oluşturmamıza olanak sağlayacak. Fakat tek serialization/deserialization yöntemi string değil. Blog serimizin sonraki yazısında, detaylı bir şekilde farklı yöntemlerden bahsediyor olacağım inşallah.

ProducerFactory üzerindeki createProducer fonksiyonunu kullanarak, bir producer instance’ı oluşturuyoruz. Daha sonra event’imizi oluşturup, producer.send fonsksiyonu ile gönderiyoruz. Buradaki send fonksiyonu, bir ProducerRecord objesi bekliyor. Bu obje, native kafka objesi olup, en temelde gönderilecek bir topic ve event’in kendisini bekler. Fakat farklı constructor’ları bulunmakta ve farklı header’ların verilmesi, partition key verilmesi, veya doğrudan gönderilmesi gereken partition’ların verilmesi gibi farklı ayarlar yapılabilmekte, tüm constructor’ları aşağıda bulabilirsiniz:

Burada event’imizi oluşturduğumuzda ProducerRecord class’ına paslamadan evvel jackson ile string’e dönüştürüyoruz.

Bu işlemleri yaptığımızda, mesajımız kafka’ya gönderilecektir.

Denemek için aşağıdaki gibi bir controller açıp localhost:8080/cart/native adresine bir get isteği gönderelim:

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequiredArgsConstructor
@RequestMapping("/cart")
public class CartProducerController {

    private final NativeKafkaProducer nativeKafkaProducer;

    @GetMapping("/native")
    public void sendNativeMessage() {
        nativeKafkaProducer.sendMessage();
    }
}

Ve mesajımızın başarılı bir şekilde gönderildiğini görmek için, kafka UI’ın localhost:8089 adresine girip Topics → cart → Messages bölümünü açtığımızda mesajımızı ekranda görebilmemiz gerekmekte:

Gördüğünüz gibi gönderme işlemi çok kolay. Spring ile bu daha da kolay bir hale geliyor. Bağlantı bilgilerini elle yönetmeye gerek kalmadan producer instance’larını rahat bir şekilde oluşturabiliyoruz. Şimdi consume etme kısmına bakalım.

Native kafka API’ları ile consume etme

Şimdi bu kısım biraz çirkin 🙂 Kafka’da pull-based bir event yönetimi yapılıyor. Yani mesaj almak isteyen client’lar, manuel bir şekilde sürekli message broker’a yeni bir event olup olmadığını sormaları gerekiyor. Maalesef kafka java API’ında native bir şekilde spring’deki controller’a benzer push based bir yöntem bulunmuyor. Fakat ilerki kısımda spring ile nasıl yapabileceğimize baktığımızda, daha zarif bir çözüm yapacağız inşallah. Fakat altında yatan mantığı önce anlamamız gerekiyor.

İlk başta genel mantığı ile başlayalım. Producer tarafında olduğu gibi, kafka consumer’larını da spring’in sağladığı mekanizmalar ile alabiliyoruz. Bir consumer oluşturduktan sonra, bir veya birden fazla topic’e subscribe olmamız gerekiyor. Genellikle tavsiye edilen, bir consumer’ın bir topic’ten sorumlu olması. Zira gelecek mesaj tiplerini genellikle topic bazlı böldüğümüz için, gelecek mesajın tipini önceden biliyor olmamız işleri kolaylaştıracaktır.

Daha sonra bir infinite loop içerisinde, sürekli mesaj var mı diye kontrol etmemiz gerekiyor. Daha sonra mesajı işleyip, belli bir süre uyuyup tekrar poll yapması gerekir.

Ayrıca, bu consume eden döngünün, ana thread’i block’lamaması için ayrı bir thread’de çalıştırılması gerekiyor. Aşağıda bir topic için örnek consumer bulabilirsiniz:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
@RequiredArgsConstructor
public class NativeKafkaCartConsumer {

    private final ConsumerFactory<String, String> consumerFactory;
    private final ObjectMapper objectMapper;
    private Consumer<String, String> consumer;

    @PostConstruct
    public void initConsumer() {
        consumer = consumerFactory.createConsumer();
        consumer.subscribe(List.of("cart"));
        Thread runnerThread = new Thread(this::executeMainLoop);
        runnerThread.start();
    }

    @SneakyThrows
    private void executeMainLoop() {
        while (true) {
            try {
                ConsumerRecords<String, String> messages = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : messages.records("cart")) {
                    CartEvent cartEvent = objectMapper.readValue(record.value(), CartEvent.class);
                    //Process message
										System.out.println(cartEvent);
                }
            } catch (IllegalStateException e) {
                break;
            }
            Thread.sleep(1000);
        }
    }

    @PreDestroy
    public void closeConsumer() {
        consumer.close(Duration.ofSeconds(5));
    }
}

consumer.poll methodu, bir Duration bekliyor. Bu süre zarfı boyunca sürekli message broker’a istek atıp, bu zaman periyodunda gelen bütün istekleri bir ConsumerRecord şeklinde bize dönüyor. İçerisindeki records fonksiyonu ile ham consumer record’ları alıp, onları daha rahat işleyebileceğimiz objeye dönüştürdükten yapmamız gereken şeyleri yapabiliriz. Daha sonra belli bir süre uyuyup, tekrar polling yapıyoruz.

Burada iki tane önemli kısım var: Birincisi, @PreDestroy ile işaretli fonksiyon içerisinde, consumer’ımızı kapatmamız gerekiyor. Öbürt türlü, bazı kaynaklar açık kalabilir. İkincisi de, consumer.poll fonksiyonunu çağırdığımız yerde try-catch yapmamız gerekiyor. Eğer kapatılmış bir consumer’da poll yapmaya çalıştığımızda, IllegalStateException fırlatıyor. Bu sayede infinite loop’u kırabiliriz.

Bunun haricinde, gelen mesajlar string türünde geliyor. Bunun sebebi, yukarda da açıklandığı gibi, mesajların string olarak serialize ve deserialize edilmesi. Bu sebeple elle jackson ObjectMapper ile istediğimiz objeye dönüştürmemiz gerekiyor. İlerki yazılarımızda bunu nasıl otomatik yapacağımıza değineceğim inşallah.

Tabii birden fazla topic’imiz olduğunda, tekrar eden çok fazla kod oluyor. Bu yöntemle mesajları consume etmek istediğimizde, ortak olan yerleri bir class’a çıkarak, sırayla tüm consumer’ları run edebiliriz. Fakat bu yöntemi kullanmayacağımız için, burada o tarz bir şeye girmiyorum.

Gördüğünüz gibi native kafka API’ları çok kullanışlı değil. Basit bir produce/consume için bile çok fazla kod yazmamız ve low level bir ton şeyle uğraşmamız gerekiyor. Tam bu haddede, spring devreye giriyor. Spring, pek çok üçüncü parti kütüphaneye yaptığı gibi, kafka’ya da bir wrapper yazarak, daha yüksek seviye bir soyutlama ile kafka’yı kullanmamızı sağlıyor.

Sprint KafkaTemplate ile mesaj gönderme

Şimdi çirkinliği görüp, dizlerimize kadar çamura battığımıza göre gün yüzü görebiliriz 🙂

KafkaTemplate, spring’in, özellikle producer tarafındaki işlemleri daha rahat bir şekilde yapılabilmesi için oluşturmuş olduğu kafka native API’ları üzerindeki bir higher level abstraction’dır. Tıpkı RestTemplate’de olduğu gibi, low level işlere bulaşmadan, daha rahat bir şekilde mesaj publish etmeye olanak sağlar. Ayrıca default olarak bir KafkaTemplate bean’i de spring tarafından oluşturulduğu için, herhangi bir producer oluşturmaya gerek kalmadan, her şeyi kendi hallediyor.

Yukarıdaki producer kodunun, KafkaTemplate ile yeniden yazılmış hali aşağıdadır:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
@RequiredArgsConstructor
public class SpringProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    @SneakyThrows
    public void sendMessages() {
        CartEvent cartEvent = new CartEvent(UUID.randomUUID().toString(), "item_id_1", 250D, 3L);
        kafkaTemplate.send("cart", objectMapper.writeValueAsString(cartEvent));
    }
}

Kod bir tık daha sade. Artık producer’ın oluşturulması ile biz ilgilenmiyoruz. Fakat KafkaTemplate çok daha fazlasını sunuyor:

  • Daha sade fonksiyonlar ile ProducerRecordoluşturmaktan kurtulabiliyoruz
  • sendDefault fonksiyonu ile, application.properties‘te tanımladığımız spring.kafka.template.default-topic parametresi ile, bir adet default topic için topic belirtmeye gerek kalmadan event gönderebiliyoruz
  • Çok daha rahat bir transaction yönetimine sahip oluyoruz(daha sonra bahsedeceğim)
  • Built-in monitoring yetenekleri bulunuyor. Micrometer kullandığımız vakitte, KafkaTemplate ile yolladığımız mesajların performansını otomatik olarak ölçebiliyoruz
  • Thread-safe bir yapısı bulunmakta. Mesaj göndermek için farklı producer’lara ihtiyacımız bulunmuyor, farklı thread’ler arasındaki senkronizasyonu kendisi sağlıyor
  • Gönderdiğimiz even’ler ile birlikte header ekleme ve yönetme işlemleri çok daha rahat oluyor

Bu ve benzeri bir çok fayda sağlanabilir. Hepsi doğrudan görünmese de, büyük ölçekteki bir uygulama için olmazsa olmaz niteliklere sahiptir.

Spring ile mesaj okuma

Mesaj göndermede avantajları olsa da, asıl kolaylık kısmı, mesajları consume etmemiz gerektiğinde ortaya çıkıyor. Native yöntemde gördüğünüz gibi çok fazla boilerplate kod var. Fakat spring, burada da devreye girerek bize çok kolay bir yöntem sunuyor.

Tıpkı web controller’larında olduğu gibi annotation tabanlı bir consumer API’ı sunmakta. Yukarıda verdiğimiz consumer’ın eşleniği, aşağıdaki gibidir:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Controller
@RequiredArgsConstructor
public class SpringCartConsumer {
    
    private final ObjectMapper objectMapper;
    
    @KafkaListener(topics = "cart")
    @SneakyThrows
    public void consumeCartEvent(final String event) {
        CartEvent cartEvent = objectMapper.readValue(event, CartEvent.class);
        System.out.println(cartEvent);
    }
}

Gördüğünüz gibi çok çok daha sade bir yapısı bulunuyor. Aslında bizim yaptığımız mekanizmanın aynısı, burada da çalışıyor. Fakat bu işlemlerin tamamını spring arkada tarafta yapıyor. Bu sayede pull based yöntemden, push based yönteme geçiyoruz ve bir event geldiği vakitte, spring bizim bu methodumuzu çağırarak, otomatik olarak bizi bilgilendiriyor.

Hala mesajları elle map’lememiz gerekiyor, fakat buna daha sonraki bir blog yazımda değineceğim inşallah.

Sonuç

Bu blog yazımızda, temel kafka konseptlerini, spring ile bir kafka cluster’ına nasıl bağlanabileceğimizi, ve native ve spring’in sağladığı API’lar ile kafka’ya nasıl mesaj gönderebileceğimizi gördük.

Sonraki blog yazımda, spring kafka’nın getirdiği exception handling, transaction yönetimi, retry gibi daha ileri konuları ele alacağım inşallah. Okuduğunuz için teşekkür ederim.

Kodların tamamına bu github repo’sundan ulaşabilirsiniz: Yte Kafka Blog