Skip to main content

Consumer

The Consumer interface provides a way to retrieve and commit messages from a Kafka topic. It has two methods:

  • GetMessage() *kafka.Message: This method retrieves a message from the Kafka topic. It returns a pointer to a kafka.Message struct, which contains the message value, key, and other metadata.
  • Commit(message *kafka.Message) error: This method commits a message to the Kafka topic. It takes a pointer to a kafka.Message struct as an argument, and returns an error if the commit fails.
tip

While it is possible to use the Consumer interface directly, it is intended to be used through the consumer middleware

Example

Here is an example of how to use the Consumer interface to retrieve and commit messages from a Kafka topic:

tip

Please check configuration page for detailed configuration option

package main

import (
"github.com/honestbank/kp/v2/consumer"
)

func main() {
c, err := consumer.New([]string{"topic-1", "topic-2"}, getConfig())
if err != nil {
// handle err
}
msg := c.GetMessage()
if msg == nil {
// handle error or no message available
}

// Process the message
// ...

// Commit the message to the Kafka topic
if err := c.Commit(msg); err != nil {
// handle error
}
}

func getConfig() any {
return nil // return your config
}

Notes

  • If the GetMessage() method returns nil, it could mean that there are no messages available or that an error occurred, generally errors are recovered internally by confluent client.
  • The Commit() method should only be called after you have successfully processed the message and are ready to commit it to the Kafka topic.