package adapter import ( "context" "errors" "fmt" "math/rand" "net" "sync" "github.com/ThreeDotsLabs/watermill/message" "go.yandata.net/iod/iod/trustlog-sdk/api/logger" ) // 订阅者配置常量. const ( defaultOutputChannelSize = 100 minOutputChannelSize = 10 maxOutputChannelSize = 10000 ) // 预定义错误. var ( ErrListenAddrRequired = errors.New("listen address is required") ErrSubscriberClosed = errors.New("subscriber is closed") ) // TCPSubscriberConfig TCP 订阅者配置 type TCPSubscriberConfig struct { // ListenAddr 监听地址,格式: "host:port" ListenAddr string // OutputChannelSize 输出 channel 的缓冲大小 // 较小的值(如 10-50):更快的背压传递,但可能降低吞吐量 // 较大的值(如 500-1000):更高的吞吐量,但背压传递较慢 // 默认值:100(平衡吞吐量和背压) OutputChannelSize int } // TCPSubscriber 实现基于 TCP 的 watermill Subscriber type TCPSubscriber struct { config TCPSubscriberConfig logger logger.Logger listener net.Listener subsLock sync.RWMutex subs map[string][]chan *message.Message // topic -> channels closed bool closedMu sync.RWMutex closeChan chan struct{} // 连接管理 connMu sync.Mutex conns []net.Conn } // NewTCPSubscriber 创建一个新的 TCP Subscriber. func NewTCPSubscriber(config TCPSubscriberConfig, logger logger.Logger) (*TCPSubscriber, error) { if config.ListenAddr == "" { return nil, ErrListenAddrRequired } // 验证和设置 channel 大小 channelSize := config.OutputChannelSize if channelSize <= 0 { channelSize = defaultOutputChannelSize } if channelSize < minOutputChannelSize { channelSize = minOutputChannelSize logger.WarnContext(context.Background(), "OutputChannelSize too small, using minimum", "configured", config.OutputChannelSize, "actual", minOutputChannelSize) } if channelSize > maxOutputChannelSize { channelSize = maxOutputChannelSize logger.WarnContext(context.Background(), "OutputChannelSize too large, using maximum", "configured", config.OutputChannelSize, "actual", maxOutputChannelSize) } listener, err := net.Listen("tcp", config.ListenAddr) if err != nil { return nil, fmt.Errorf("failed to listen on %s: %w", config.ListenAddr, err) } // 更新配置中的实际 channel 大小 config.OutputChannelSize = channelSize s := &TCPSubscriber{ config: config, logger: logger, listener: listener, subs: make(map[string][]chan *message.Message), closeChan: make(chan struct{}), conns: make([]net.Conn, 0), } // 启动接受连接的协程 go s.acceptConnections() logger.InfoContext(context.Background(), "TCP Subscriber listening", "addr", config.ListenAddr, "channel_size", channelSize) return s, nil } // acceptConnections 接受客户端连接 func (s *TCPSubscriber) acceptConnections() { ctx := context.Background() for { select { case <-s.closeChan: s.logger.InfoContext(ctx, "Stopping connection acceptor") return default: conn, err := s.listener.Accept() if err != nil { s.closedMu.RLock() closed := s.closed s.closedMu.RUnlock() if closed { return } s.logger.ErrorContext(ctx, "Failed to accept connection", "error", err) continue } s.logger.InfoContext(ctx, "Accepted new connection", "remote", conn.RemoteAddr().String()) // 保存连接 s.connMu.Lock() s.conns = append(s.conns, conn) s.connMu.Unlock() // 为每个连接启动处理协程 go s.handleConnection(conn) } } } // handleConnection 处理单个客户端连接 func (s *TCPSubscriber) handleConnection(conn net.Conn) { ctx := context.Background() defer func() { conn.Close() s.logger.InfoContext(ctx, "Connection closed", "remote", conn.RemoteAddr().String()) }() for { select { case <-s.closeChan: return default: // 读取消息 tcpMsg, err := DecodeTCPMessage(conn) if err != nil { s.closedMu.RLock() closed := s.closed s.closedMu.RUnlock() if closed { return } s.logger.ErrorContext(ctx, "Failed to decode message", "error", err) return } if tcpMsg.Type != MessageTypeData { s.logger.WarnContext(ctx, "Unexpected message type", "type", tcpMsg.Type) continue } // 处理消息 s.handleMessage(ctx, conn, tcpMsg) } } } // handleMessage 处理消息(发送即成功模式,无需 ACK/NACK) func (s *TCPSubscriber) handleMessage(ctx context.Context, conn net.Conn, tcpMsg *TCPMessage) { s.logger.DebugContext(ctx, "Received message", "uuid", tcpMsg.UUID, "topic", tcpMsg.Topic) // 获取该 topic 的订阅者 s.subsLock.RLock() channels, found := s.subs[tcpMsg.Topic] s.subsLock.RUnlock() if !found || len(channels) == 0 { s.logger.WarnContext(ctx, "No subscribers for topic", "topic", tcpMsg.Topic) // 不再发送 NACK,直接丢弃消息 return } // 创建 watermill 消息 msg := message.NewMessage(tcpMsg.UUID, tcpMsg.Payload) // 使用随机策略选择订阅者(无锁,性能更好) randomIndex := rand.Intn(len(channels)) outputChan := channels[randomIndex] // 记录 channel 使用情况,便于监控背压 channelLen := len(outputChan) channelCap := cap(outputChan) usage := float64(channelLen) / float64(channelCap) * 100 s.logger.DebugContext(ctx, "Dispatching message via random selection", "uuid", tcpMsg.UUID, "subscriber_index", randomIndex, "total_subscribers", len(channels), "channel_usage", fmt.Sprintf("%.1f%% (%d/%d)", usage, channelLen, channelCap)) // 阻塞式发送:当 channel 满时会阻塞,从而触发 TCP 背压 // 这会导致: // 1. 当前 goroutine 阻塞 // 2. TCP 读取停止 // 3. TCP 接收窗口填满 // 4. 发送端收到零窗口通知 // 5. 发送端停止发送 select { case outputChan <- msg: s.logger.DebugContext(ctx, "Message sent to subscriber", "uuid", tcpMsg.UUID, "index", randomIndex) // 发送即成功:立即 Ack 消息,不等待处理结果 msg.Ack() case <-s.closeChan: s.logger.DebugContext(ctx, "Subscriber closed, message discarded", "uuid", tcpMsg.UUID) return } // 不再等待消息被 ACK 或 NACK,也不发送 ACK/NACK 回执 } // sendAck 方法已移除 // 采用发送即成功模式,不再发送 ACK/NACK 回执以提高性能 // Subscribe 订阅指定 topic 的消息. func (s *TCPSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { s.closedMu.RLock() if s.closed { s.closedMu.RUnlock() return nil, ErrSubscriberClosed } s.closedMu.RUnlock() // 使用配置的 channel 大小 channelSize := s.config.OutputChannelSize if channelSize <= 0 { channelSize = defaultOutputChannelSize } output := make(chan *message.Message, channelSize) s.subsLock.Lock() if s.subs[topic] == nil { s.subs[topic] = make([]chan *message.Message, 0) } s.subs[topic] = append(s.subs[topic], output) subscriberCount := len(s.subs[topic]) s.subsLock.Unlock() s.logger.InfoContext(ctx, "Subscribed to topic", "topic", topic, "subscriber_count", subscriberCount, "channel_size", channelSize) return output, nil } // Close 关闭订阅者 func (s *TCPSubscriber) Close() error { s.closedMu.Lock() if s.closed { s.closedMu.Unlock() return nil } s.closed = true s.closedMu.Unlock() close(s.closeChan) // 关闭监听器 if s.listener != nil { if err := s.listener.Close(); err != nil { s.logger.ErrorContext(context.Background(), "Failed to close listener", "error", err) } } // 关闭所有连接 s.connMu.Lock() for _, conn := range s.conns { conn.Close() } s.connMu.Unlock() // 关闭所有订阅通道 s.subsLock.Lock() for topic, channels := range s.subs { for _, ch := range channels { close(ch) } delete(s.subs, topic) } s.subsLock.Unlock() s.logger.InfoContext(context.Background(), "TCP Subscriber closed") return nil }