High-performance Go SDK for building distributed streaming applications with StreamBus
This guide will walk you through setting up and using the StreamBus Go SDK to build your first streaming application.
Before you begin, make sure you have:
mkdir my-streambus-app
cd my-streambus-app
go mod init my-streambus-app
go get github.com/gstreamio/streambus-sdk
import "github.com/gstreamio/streambus-sdk/client"
Let’s create a simple producer that sends messages to StreamBus:
package main
import (
"fmt"
"log"
"time"
"github.com/gstreamio/streambus-sdk/client"
)
func main() {
// Create configuration
config := client.DefaultConfig()
config.Brokers = []string{"localhost:9092"}
// Connect to StreamBus
c, err := client.New(config)
if err != nil {
log.Fatal("Failed to connect:", err)
}
defer c.Close()
// Create a topic (if it doesn't exist)
if err := c.CreateTopic("my-events", 3, 1); err != nil {
log.Printf("Topic creation: %v", err)
}
// Create a producer
producer := client.NewProducer(c)
defer producer.Close()
// Send 10 messages
for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
value := []byte(fmt.Sprintf("Message #%d sent at %s", i, time.Now()))
if err := producer.Send("my-events", key, value); err != nil {
log.Printf("Failed to send message: %v", err)
continue
}
fmt.Printf("✓ Sent message %d\n", i)
time.Sleep(time.Second)
}
fmt.Println("Producer finished!")
}
go run producer.go
You should see output like:
✓ Sent message 0
✓ Sent message 1
✓ Sent message 2
...
Producer finished!
Now let’s create a consumer to read those messages:
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/gstreamio/streambus-sdk/client"
)
func main() {
// Create configuration
config := client.DefaultConfig()
config.Brokers = []string{"localhost:9092"}
// Connect to StreamBus
c, err := client.New(config)
if err != nil {
log.Fatal("Failed to connect:", err)
}
defer c.Close()
// Create consumer for partition 0
consumer := client.NewConsumer(c, "my-events", 0)
defer consumer.Close()
// Start from the beginning
if err := consumer.Seek(0); err != nil {
log.Fatal("Failed to seek:", err)
}
// Set up signal handling for graceful shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("Consumer started. Press Ctrl+C to stop...")
// Consume messages
for {
select {
case <-signals:
fmt.Println("\nShutting down...")
return
default:
record, err := consumer.Fetch()
if err != nil {
log.Printf("Fetch error: %v", err)
continue
}
fmt.Printf("📨 Received: key=%s, value=%s, offset=%d\n",
string(record.Key), string(record.Value), record.Offset)
}
}
}
go run consumer.go
You should see the messages being consumed:
Consumer started. Press Ctrl+C to stop...
📨 Received: key=key-0, value=Message #0 sent at 2024-01-15 10:30:00, offset=0
📨 Received: key=key-1, value=Message #1 sent at 2024-01-15 10:30:01, offset=1
...
Consumer groups allow multiple consumers to work together to process messages:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/gstreamio/streambus-sdk/client"
)
func main() {
// Create configuration
config := client.DefaultConfig()
config.Brokers = []string{"localhost:9092"}
// Connect to StreamBus
c, err := client.New(config)
if err != nil {
log.Fatal("Failed to connect:", err)
}
defer c.Close()
// Configure consumer group
groupConfig := &client.GroupConsumerConfig{
GroupID: "my-consumer-group",
Topics: []string{"my-events"},
}
// Create group consumer
consumer, err := client.NewGroupConsumer(c, groupConfig)
if err != nil {
log.Fatal("Failed to create group consumer:", err)
}
defer consumer.Close()
// Set up context for cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signals
fmt.Println("\nShutting down...")
cancel()
}()
// Start consuming with message handler
fmt.Println("Group consumer started. Press Ctrl+C to stop...")
err = consumer.Start(ctx, func(record *client.Record) error {
fmt.Printf("📨 Group consumed: topic=%s, partition=%d, offset=%d\n",
record.Topic, record.Partition, record.Offset)
fmt.Printf(" Key: %s\n", string(record.Key))
fmt.Printf(" Value: %s\n", string(record.Value))
// Process the message here
// Return error to retry, nil to commit
return nil
})
if err != nil && err != context.Canceled {
log.Fatal("Consumer error:", err)
}
}
Open multiple terminals and run the same consumer:
# Terminal 1
go run group_consumer.go
# Terminal 2
go run group_consumer.go
# Terminal 3
go run group_consumer.go
StreamBus will automatically distribute partitions among the consumers in the group.
config := &client.Config{
// Broker endpoints
Brokers: []string{"broker1:9092", "broker2:9092"},
// Timeouts
ConnectTimeout: 10 * time.Second,
RequestTimeout: 30 * time.Second,
// Connection pooling
MaxIdleConns: 10,
MaxConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
// Retry settings
MaxRetries: 3,
RetryBackoff: 100 * time.Millisecond,
}
config.Security = &client.SecurityConfig{
TLS: &client.TLSConfig{
Enabled: true,
CAFile: "/path/to/ca.crt",
CertFile: "/path/to/client.crt",
KeyFile: "/path/to/client.key",
ServerName: "streambus.example.com",
},
}
config.Security = &client.SecurityConfig{
SASL: &client.SASLConfig{
Enabled: true,
Mechanism: "SCRAM-SHA-256",
Username: "myuser",
Password: "mypassword",
},
}
Always handle errors appropriately in production:
if err := producer.Send("topic", key, value); err != nil {
switch {
case errors.Is(err, client.ErrConnectionFailed):
// Reconnect or circuit break
log.Error("Connection failed, implementing backoff...")
time.Sleep(time.Second * 5)
case errors.Is(err, client.ErrTimeout):
// Retry with backoff
log.Warn("Request timed out, retrying...")
default:
// Log and handle unexpected errors
log.Error("Unexpected error:", err)
}
}
// Add metrics collection
metrics := &client.Metrics{
OnSend: func(topic string, latency time.Duration, err error) {
// Record metrics
},
OnFetch: func(topic string, partition int, count int) {
// Record metrics
},
}
config.Metrics = metrics
Congratulations! You’ve learned the basics of StreamBus SDK. Here’s what to explore next: