Batch Processing with Kafka using Karafka Batch
Sometimes in several cases don’t need a real-time update, but in reality there still use real-time one by one to update data, which can be burdensome I/O in the database for example. So for the solution to decrease the load in the I/O database we can use batching process. In this article, I have a use case when the application has a receiver webhook and they need to buffer the data and batching process because they don’t need real-time data update. Here is the story!
Table of Contents
- Use case Story
- Setup Karafka in Ruby on Rails
- Testing the Batch process
Use case Story
I have found a case (webhook receiver) in an application they don’t need a real-time update. I will create diagrams of the existing flow (before using batching process) and how I increase performance with the batching process.
Existing Process (Before)
the diagram above illustrates if every request data is processed directly. So, if sometimes you got a lot of requests that can make your database I/O increased significantly.
Adjust flow with Batching Process
the diagram above if you see, some requests entered the webhook receiver but after they entered to webhook receiver the data has been buffered with Kafka. So, after data has been buffered the background job can run with batch and that can make decrease the database I/O significantly.
Setup Karafka in Ruby on Rails
Firstly, we need some gems
# core karafka
gem 'karafka'
# for testing
gem 'karafka-testing'
gem 'rspec'
and when I install them I got this versions in Gemfile.lock
maybe if you follow this article in someday that can be different 😄
# .....
karafka (2.1.2)
karafka-core (>= 2.0.13, < 3.0.0)
thor (>= 0.20)
waterdrop (>= 2.5.3, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.0.13)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.12.3)
karafka-rdkafka (0.12.3)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
karafka-testing (2.1.0)
karafka (>= 2.1.0, < 3.0.0)
karafka-web (0.5.2)
erubi (~> 1.4)
karafka (>= 2.0.40, < 3.0.0)
karafka-core (>= 2.0.12, < 3.0.0)
roda (~> 3.63)
tilt (~> 2.0)
# ........
After the gem is installed in Rails, make a file karafka.rb
in the root directory.
class KarafkaApp < Karafka::App
setup do |config|
config.kafka = { 'bootstrap.servers': '127.0.0.1:9092' }
config.client_id = 'Conto'
config.concurrency = 2
config.max_wait_time = 500 # 0.5 second
config.max_messages = 10 # You can adjust with your requirement if you don't change this (default: 100)
# Recreate consumers with each batch. This will allow Rails code reload to work in the
# development mode. Otherwise Karafka process would not be aware of code changes
config.consumer_persistence = !Rails.env.development?
end
# Comment out this part if you are not using instrumentation and/or you are not
# interested in logging events for certain environments. Since instrumentation
# notifications add extra boilerplate, if you want to achieve max performance,
# listen to only what you really need for given environment.
Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new)
# Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new)
Karafka.producer.monitor.subscribe(
WaterDrop::Instrumentation::LoggerListener.new(Karafka.logger)
)
routes.draw do
# This needs to match queues defined in your ActiveJobs
active_job_topic :default do
# Expire jobs after 1 day
config(partitions: 5, 'retention.ms': 86_400_000)
end
topic :batch_example do
config(partitions: 2)
consumer Kafka::Consumers::BatchExample # consumer class name
end
end
end
Karafka::Web.enable!
# You can tag your processes with any info you want and it is going to be visible via the Web UI
git_hash = `git rev-parse --short HEAD`.strip
Karafka::Process.tags.add(:commit, "##{git_hash}")
Actually above is the standard configuration from https://github.com/karafka/example-apps.
Create the producer, consumer and integrate web UI if you need
Because in this article just want to make an example to make batch process. So. I will create an example just for testing so the reader can get illustrated. In this example the directory I will put inside ./app/jobs/kafka/..
. like this :
❯ tree app/jobs
app/jobs
├── application_job.rb
└── kafka
├── consumers
│ └── batch_example.rb
└── producers
└── batch_example.rb
4 directories, 3 files
# ../consumers/batch_example.rb
# note: this class will be used in karafka.rb (you can see at above)
# frozen_string_literal: true
class Kafka::Consumers::BatchExample < Karafka::BaseConsumer
def consume
Rails.logger.debug '==================== BATCH CONSUMER START ===================='
messages.each do |message|
Rails.logger.debug message.payload
end
Rails.logger.debug '==================== BATCH CONSUMER END ===================='
sum = messages.sum { |message| message.payload['number'] }
Karafka.logger.info "Sum of #{messages.count} elements equals to: #{sum}"
rescue => e
Karafka.logger.error e
end
end
Actually, in this consumer you can make something with batch data, like update data with bathing process or many more. But, in this article i just give you example to imagine batching process with kafka.
# ../producers/batch_example
# frozen_string_literal: true
class Kafka::Producers::BatchExample
def initialize(messages)
@messages = messages
end
def perform
::Karafka.producer.produce_async(
topic: 'batch_example',
payload: @messages.to_json,
)
end
end
and the last don’t forget to integrate karafka UI too, in routers.rb
if you need 👌
# config/routes.rb
# frozen_string_literal: true
require 'karafka/web'
Rails.application.routes.draw do
# another router ....
mount Karafka::Web::App, at: '/karafka'
end
Testing the batch process
This is the last part of this article. So, firstly I will give an example to execute manual from the terminal. Check this out!
before we try to test whether it’s manual or with rspec, you need to run kafka locally. In this article I run kafka with docker. So, here my docker-compose.yml
version: '3'
services:
kafka:
image: bitnami/kafka:latest
container_name: kafka
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_NODE_ID=1 # Set the KAFKA_CFG_NODE_ID for the quorum controller
ports:
- '9092:9092'
Manual Test
Firstly, in root directory execute this command
❯ bundle exec karafka server
@@@ @@@@@ @@@
@@@ @@@ @@@
@@@ @@@ @@@@@@@@@ @@@ @@@ @@@@@@@@@ @@@@@@@@ @@@ @@@@ @@@@@@@@@
@@@@@@ @@@ @@@ @@@@@ @@@ @@@ @@@ @@@@@@@ @@@ @@@
@@@@@@@ @@@ @@@ @@@ @@@@ @@@ @@@ @@@@@@@ @@@ @@@
@@@ @@@@ @@@@@@@@@@ @@@ @@@@@@@@@@ @@@ @@@ @@@@ @@@@@@@@@@
Upgrade to Karafka Pro for more features and support: <https://karafka.io>
Running in ruby 3.2.1 (2023-02-08 revision 31819e82c8) [arm64-darwin22]
Running Karafka 2.1.2 server
See LICENSE and the LGPL-3.0 for licensing details
[0a124f3e4679] Async producing of 1 messages to 1 topics took 1.6779999732971191 ms
[0a124f3e4679] [{:topic=>"karafka_consumers_reports", :payload=>"{\\"schema_version\\":\\"1.2.0\\",\\"type\\":\\"consumer\\",\\"dispatched_at\\":1685269658.467652,\\"process\\":{\\"started_at\\":1685269658.1541078,\\"name\\":\\"MacBooks-MacBook-Air.local:17317:ba2141f30114\\",\\"status\\":\\"running\\",\\"listeners\\":2,\\"concurrency\\":2,\\"memory_usage\\":145456,\\"memory_total_usage\\":3384120,\\"memory_size\\":8589934592,\\"cpu_count\\":8,\\"cpu_usage\\":[2.58,2.48,2.48],\\"tags\\":[\\"#33bce0e\\"]},\\"versions\\":{\\"ruby\\":\\"ruby 3.2.1-31 31819e\\",\\"karafka\\":\\"2.1.2\\",\\"waterdrop\\":\\"2.5.3\\",\\"karafka_core\\":\\"2.0.13\\",\\"rdkafka\\":\\"0.12.3\\",\\"librdkafka\\":\\"2.0.2\\"},\\"stats\\":{\\"busy\\":0,\\"enqueued\\":0,\\"utilization\\":0,\\"total\\":{\\"batches\\":0,\\"messages\\":0,\\"errors\\":0,\\"retries\\":0,\\"dead\\":0}},\\"consumer_groups\\":{},\\"jobs\\":[]}", :key=>"MacBooks-MacBook-Air.local:17317:ba2141f30114", :partition=>0}]
[4aa7272886bc] Polling messages...
[881a45a6e649] Polling messages...
#............
So after karafka server is up, you can test with rails console
to test if the batching process if succeed. to test I executed command like this.
❯ rails c
Loading development environment (Rails 7.0.4.2)
irb(main):001:0> 213.times.each { |_| Kafka::Producers::BatchExample.new({ 'number' => rand }).perform }
[9a66dd3bbb9b] Async producing of a message to 'batch_example' topic took 13.630000114440918 ms
[9a66dd3bbb9b] {:topic=>"batch_example", :payload=>"{\\"number\\":0.6800771225867617}"}
[9a66dd3bbb9b] Async producing of a message to 'batch_example' topic took 0.026000261306762695 ms
[9a66dd3bbb9b] {:topic=>"batch_example", :payload=>"{\\"number\\":0.6247043818551317}"}
[9a66dd3bbb9b] Async producing of a message to 'batch_example' topic took 0.012000083923339844 ms
[9a66dd3bbb9b] {:topic=>"batch_example", :payload=>"{\\"number\\":0.41191365984889317}"}
[9a66dd3bbb9b] Async producing of a message to 'batch_example' topic took 0.009999990463256836 ms
[9a66dd3bbb9b] {:topic=>"batch_example", :payload=>"{\\"number\\":0.9100126240824636}"}
[9a66dd3bbb9b] Async producing of a message to 'batch_example' topic took 0.009999752044677734 ms
[9a66dd3bbb9b] {:topic=>"batch_example", :payload=>"{\\"number\\":0.07528866104859089}"}
#.........
after executing the command above, you can see the batch process has succeeded by seeing the log from the terminal that has executed bundle exec karafka server
.
the message executed by 10 and the last 3 executed just 3 because that doesn’t have another message when the karafka configured max_wait_time = 500 # 0.5 second
. So, if you increase the number you will see by yourself with yourself configuration 🤝.
Testing with Rspec and kafka-testing
After trying testing manually way. So, I will try with unit test by rspec
and kafka-testing
.
Firstly, create the _spec.rb
file inside test/karafka
(Actually, this is uptoyou guys 😃).
# test/karafka/batch_example_spec.rb
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe Kafka::Consumers::BatchExample do
subject(:consumer) { karafka.consumer_for(:batch_example) }
let(:nr1_value) { rand }
let(:nr2_value) { rand }
let(:sum) { nr1_value + nr2_value }
before do
# Sends first message to Karafka consumer
karafka.produce({ 'number' => nr1_value }.to_json)
# Sends second message to Karafka consumer
karafka.produce({ 'number' => nr2_value }.to_json)
allow(Karafka.logger).to receive(:info)
end
it 'expects to log a proper message' do
expect(Karafka.logger).to receive(:info).with("Sum of 2 elements equals to: #{sum}")
consumer.consume
end
end
but don’t forget to include the karafka rspec helper in rails_helper.rb
# .....
require 'karafka/testing/rspec/helpers'
# .....
RSpec.configure do |config|
# .....
config.include Karafka::Testing::RSpec::Helpers
# .....
end
So, run the rspec command
❯ bundle exec rspec test/karafka/batch_example_spec.rb
.
Finished in 0.02734 seconds (files took 1.61 seconds to load)
1 example, 0 failures
Thank you guys, that’s all from me.
Bye bye ✋