Skip to main content

Backoff

The backoff middleware slows down the processing of messages when errors occur. It uses a backoff policy to determine how long to wait between processing attempts.

Example

danger

Adding multiple backoff middleware is possible.

package main

import (
"context"
"fmt"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
backoff_policy "github.com/honestbank/backoff-policy"
"github.com/honestbank/backoff-policy/policies"
v2 "github.com/honestbank/kp/v2"
"github.com/honestbank/kp/v2/middlewares/backoff"
)

func main() {
kp := v2.New[kafka.Message]()
exponent, duration, maxBackoffCount := 1.5, time.Millisecond*200, 10
backoffPolicy := backoff_policy.NewBackoff(policies.GetExponentialPolicy(exponent, duration, maxBackoffCount))
kp.AddMiddleware(backoff.NewBackoffMiddleware(backoffPolicy)) // simply add a backoff middleware to back off.
err := kp.Process(processUserLoggedInEvent)
if err != nil {
panic(err) // do better error handling
}
}

func processUserLoggedInEvent(ctx context.Context, message *kafka.Message) error {
// here, you can focus on your business logic.
fmt.Printf("processing %v\n", message)
time.Sleep(time.Millisecond * 200) // simulate long running process
return nil // or error
}

func getConfig() any {
return nil // return your config
}