High-performance Go SDK for building distributed streaming applications with StreamBus
Understanding the internal architecture of the StreamBus Go SDK.
The StreamBus SDK is built on several core principles:
┌─────────────────────────────────────────────────────┐
│ Application │
└────────────┬────────────────────────┬────────────────┘
│ │
┌────────▼──────┐ ┌───────▼──────┐
│ Producer │ │ Consumer │
└────────┬──────┘ └───────┬──────┘
│ │
┌────────▼────────────────────────▼────────────┐
│ Client Core │
│ ┌─────────────────────────────────────┐ │
│ │ Connection Pool Manager │ │
│ └─────────────────────────────────────┘ │
│ ┌─────────────────────────────────────┐ │
│ │ Protocol Handler │ │
│ └─────────────────────────────────────┘ │
│ ┌─────────────────────────────────────┐ │
│ │ Buffer Pool │ │
│ └─────────────────────────────────────┘ │
└──────────────────────┬───────────────────────┘
│
┌──────────────────────▼───────────────────────┐
│ StreamBus Brokers │
└───────────────────────────────────────────────┘
The Client is the central component that manages:
Efficient connection management:
Low-level protocol implementation:
Memory optimization:
Application → Producer.Send()
↓
Partition Selection
↓
Message Serialization
↓
Compression (optional)
↓
Protocol Encoding
↓
Connection Pool → Broker
↓
Acknowledgment ← Broker
↓
Application Callback
Broker → Connection Pool
↓
Protocol Decoding
↓
Decompression (optional)
↓
Message Deserialization
↓
Application Handler
↓
Offset Commit → Broker
The SDK implements multiple layers of failure handling:
type BufferPool struct {
small sync.Pool // < 4KB
medium sync.Pool // 4KB - 64KB
large sync.Pool // > 64KB
}
All public APIs are thread-safe:
// Producer goroutines
- Network I/O worker
- Batch accumulator
- Retry handler
// Consumer goroutines
- Fetch worker
- Offset committer
- Heartbeat sender
Supported mechanisms:
┌─────────────┬──────────────┬─────────────┬──────────┐
│ Magic Bytes │ Message Type │ Correlation │ Payload │
│ (4 bytes) │ (2 bytes) │ (4 bytes) │ Variable │
└─────────────┴──────────────┴─────────────┴──────────┘
┌──────────┬──────────┬───────────┬──────────┬─────────┐
│ Headers │ Key │ Value │ Timestamp│ CRC │
│ Variable │ Variable │ Variable │ 8 bytes │ 4 bytes │
└──────────┴──────────┴───────────┴──────────┴─────────┘
type Serializer interface {
Serialize(data interface{}) ([]byte, error)
Deserialize(bytes []byte, target interface{}) error
}
type ProducerInterceptor interface {
OnSend(record *Record) *Record
OnAcknowledgment(metadata *Metadata, err error)
}
type MetricsCollector interface {
RecordSend(topic string, latency time.Duration)
RecordFetch(topic string, partition int, count int)
RecordError(operation string, err error)
}
See our Contributing Guide for: