Files
go-trustlog/api/adapter/tcp_subscriber.go
ryan d313449c5c refactor: 重构trustlog-sdk目录结构到trustlog/go-trustlog
- 将所有trustlog-sdk文件移动到trustlog/go-trustlog/目录
- 更新README中所有import路径从trustlog-sdk改为go-trustlog
- 更新cookiecutter配置文件中的项目名称
- 更新根目录.lefthook.yml以引用新位置的配置
- 添加go.sum文件到版本控制
- 删除过时的示例文件

这次重构与trustlog-server保持一致的目录结构,
为未来支持多语言SDK(Python、Java等)预留空间。
2025-12-22 13:37:57 +08:00

311 lines
7.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package adapter
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"github.com/ThreeDotsLabs/watermill/message"
"go.yandata.net/iod/iod/trustlog-sdk/api/logger"
)
// 订阅者配置常量.
const (
defaultOutputChannelSize = 100
minOutputChannelSize = 10
maxOutputChannelSize = 10000
)
// 预定义错误.
var (
ErrListenAddrRequired = errors.New("listen address is required")
ErrSubscriberClosed = errors.New("subscriber is closed")
)
// TCPSubscriberConfig TCP 订阅者配置
type TCPSubscriberConfig struct {
// ListenAddr 监听地址,格式: "host:port"
ListenAddr string
// OutputChannelSize 输出 channel 的缓冲大小
// 较小的值(如 10-50更快的背压传递但可能降低吞吐量
// 较大的值(如 500-1000更高的吞吐量但背压传递较慢
// 默认值100平衡吞吐量和背压
OutputChannelSize int
}
// TCPSubscriber 实现基于 TCP 的 watermill Subscriber
type TCPSubscriber struct {
config TCPSubscriberConfig
logger logger.Logger
listener net.Listener
subsLock sync.RWMutex
subs map[string][]chan *message.Message // topic -> channels
closed bool
closedMu sync.RWMutex
closeChan chan struct{}
// 连接管理
connMu sync.Mutex
conns []net.Conn
}
// NewTCPSubscriber 创建一个新的 TCP Subscriber.
func NewTCPSubscriber(config TCPSubscriberConfig, logger logger.Logger) (*TCPSubscriber, error) {
if config.ListenAddr == "" {
return nil, ErrListenAddrRequired
}
// 验证和设置 channel 大小
channelSize := config.OutputChannelSize
if channelSize <= 0 {
channelSize = defaultOutputChannelSize
}
if channelSize < minOutputChannelSize {
channelSize = minOutputChannelSize
logger.WarnContext(context.Background(), "OutputChannelSize too small, using minimum",
"configured", config.OutputChannelSize, "actual", minOutputChannelSize)
}
if channelSize > maxOutputChannelSize {
channelSize = maxOutputChannelSize
logger.WarnContext(context.Background(), "OutputChannelSize too large, using maximum",
"configured", config.OutputChannelSize, "actual", maxOutputChannelSize)
}
listener, err := net.Listen("tcp", config.ListenAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %w", config.ListenAddr, err)
}
// 更新配置中的实际 channel 大小
config.OutputChannelSize = channelSize
s := &TCPSubscriber{
config: config,
logger: logger,
listener: listener,
subs: make(map[string][]chan *message.Message),
closeChan: make(chan struct{}),
conns: make([]net.Conn, 0),
}
// 启动接受连接的协程
go s.acceptConnections()
logger.InfoContext(context.Background(), "TCP Subscriber listening",
"addr", config.ListenAddr,
"channel_size", channelSize)
return s, nil
}
// acceptConnections 接受客户端连接
func (s *TCPSubscriber) acceptConnections() {
ctx := context.Background()
for {
select {
case <-s.closeChan:
s.logger.InfoContext(ctx, "Stopping connection acceptor")
return
default:
conn, err := s.listener.Accept()
if err != nil {
s.closedMu.RLock()
closed := s.closed
s.closedMu.RUnlock()
if closed {
return
}
s.logger.ErrorContext(ctx, "Failed to accept connection", "error", err)
continue
}
s.logger.InfoContext(ctx, "Accepted new connection", "remote", conn.RemoteAddr().String())
// 保存连接
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
// 为每个连接启动处理协程
go s.handleConnection(conn)
}
}
}
// handleConnection 处理单个客户端连接
func (s *TCPSubscriber) handleConnection(conn net.Conn) {
ctx := context.Background()
defer func() {
conn.Close()
s.logger.InfoContext(ctx, "Connection closed", "remote", conn.RemoteAddr().String())
}()
for {
select {
case <-s.closeChan:
return
default:
// 读取消息
tcpMsg, err := DecodeTCPMessage(conn)
if err != nil {
s.closedMu.RLock()
closed := s.closed
s.closedMu.RUnlock()
if closed {
return
}
s.logger.ErrorContext(ctx, "Failed to decode message", "error", err)
return
}
if tcpMsg.Type != MessageTypeData {
s.logger.WarnContext(ctx, "Unexpected message type", "type", tcpMsg.Type)
continue
}
// 处理消息
s.handleMessage(ctx, conn, tcpMsg)
}
}
}
// handleMessage 处理消息(发送即成功模式,无需 ACK/NACK
func (s *TCPSubscriber) handleMessage(ctx context.Context, conn net.Conn, tcpMsg *TCPMessage) {
s.logger.DebugContext(ctx, "Received message", "uuid", tcpMsg.UUID, "topic", tcpMsg.Topic)
// 获取该 topic 的订阅者
s.subsLock.RLock()
channels, found := s.subs[tcpMsg.Topic]
s.subsLock.RUnlock()
if !found || len(channels) == 0 {
s.logger.WarnContext(ctx, "No subscribers for topic", "topic", tcpMsg.Topic)
// 不再发送 NACK直接丢弃消息
return
}
// 创建 watermill 消息
msg := message.NewMessage(tcpMsg.UUID, tcpMsg.Payload)
// 使用随机策略选择订阅者(无锁,性能更好)
randomIndex := rand.Intn(len(channels))
outputChan := channels[randomIndex]
// 记录 channel 使用情况,便于监控背压
channelLen := len(outputChan)
channelCap := cap(outputChan)
usage := float64(channelLen) / float64(channelCap) * 100
s.logger.DebugContext(ctx, "Dispatching message via random selection",
"uuid", tcpMsg.UUID,
"subscriber_index", randomIndex,
"total_subscribers", len(channels),
"channel_usage", fmt.Sprintf("%.1f%% (%d/%d)", usage, channelLen, channelCap))
// 阻塞式发送:当 channel 满时会阻塞,从而触发 TCP 背压
// 这会导致:
// 1. 当前 goroutine 阻塞
// 2. TCP 读取停止
// 3. TCP 接收窗口填满
// 4. 发送端收到零窗口通知
// 5. 发送端停止发送
select {
case outputChan <- msg:
s.logger.DebugContext(ctx, "Message sent to subscriber", "uuid", tcpMsg.UUID, "index", randomIndex)
// 发送即成功:立即 Ack 消息,不等待处理结果
msg.Ack()
case <-s.closeChan:
s.logger.DebugContext(ctx, "Subscriber closed, message discarded", "uuid", tcpMsg.UUID)
return
}
// 不再等待消息被 ACK 或 NACK也不发送 ACK/NACK 回执
}
// sendAck 方法已移除
// 采用发送即成功模式,不再发送 ACK/NACK 回执以提高性能
// Subscribe 订阅指定 topic 的消息.
func (s *TCPSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
s.closedMu.RLock()
if s.closed {
s.closedMu.RUnlock()
return nil, ErrSubscriberClosed
}
s.closedMu.RUnlock()
// 使用配置的 channel 大小
channelSize := s.config.OutputChannelSize
if channelSize <= 0 {
channelSize = defaultOutputChannelSize
}
output := make(chan *message.Message, channelSize)
s.subsLock.Lock()
if s.subs[topic] == nil {
s.subs[topic] = make([]chan *message.Message, 0)
}
s.subs[topic] = append(s.subs[topic], output)
subscriberCount := len(s.subs[topic])
s.subsLock.Unlock()
s.logger.InfoContext(ctx, "Subscribed to topic",
"topic", topic,
"subscriber_count", subscriberCount,
"channel_size", channelSize)
return output, nil
}
// Close 关闭订阅者
func (s *TCPSubscriber) Close() error {
s.closedMu.Lock()
if s.closed {
s.closedMu.Unlock()
return nil
}
s.closed = true
s.closedMu.Unlock()
close(s.closeChan)
// 关闭监听器
if s.listener != nil {
if err := s.listener.Close(); err != nil {
s.logger.ErrorContext(context.Background(), "Failed to close listener", "error", err)
}
}
// 关闭所有连接
s.connMu.Lock()
for _, conn := range s.conns {
conn.Close()
}
s.connMu.Unlock()
// 关闭所有订阅通道
s.subsLock.Lock()
for topic, channels := range s.subs {
for _, ch := range channels {
close(ch)
}
delete(s.subs, topic)
}
s.subsLock.Unlock()
s.logger.InfoContext(context.Background(), "TCP Subscriber closed")
return nil
}