package adapter import ( "context" "errors" "fmt" "sync" "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/apache/pulsar-client-go/pulsar" "go.yandata.net/iod/iod/trustlog-sdk/api/logger" logger2 "go.yandata.net/iod/iod/trustlog-sdk/internal/logger" ) const ( SubNameKey contextKey = "subName" ReceiverQueueSizeKey contextKey = "receiverQueueSize" IndexKey contextKey = "index" ReceiverQueueSizeDefault = 1000 SubNameDefault = "subName" TimeOutDefault = time.Second * 10 defaultMessageChannelSize = 10 ) type contextKey string var _ message.Subscriber = &Subscriber{} // SubscriberConfig is the configuration to create a subscriber. type SubscriberConfig struct { // URL is the URL to the broker URL string // SubscriberName is the name of the subscription. SubscriberName string // SubscriberType is the type of the subscription. SubscriberType pulsar.SubscriptionType // TLSTrustCertsFilePath is the path to the CA certificate file for verifying the server certificate. // If empty, TLS verification will be disabled. TLSTrustCertsFilePath string // TLSCertificateFilePath is the path to the client certificate file for mTLS authentication. // If empty, mTLS authentication will be disabled. TLSCertificateFilePath string // TLSKeyFilePath is the path to the client private key file for mTLS authentication. // If empty, mTLS authentication will be disabled. TLSKeyFilePath string // TLSAllowInsecureConnection allows insecure TLS connections (not recommended for production). TLSAllowInsecureConnection bool } // Subscriber provides the pulsar implementation for watermill subscribe operations. type Subscriber struct { conn pulsar.Client logger logger.Logger subsLock sync.RWMutex // Change to map with composite key: topic + subscriptionName + subName subs map[string]pulsar.Consumer closed bool closing chan struct{} SubscribersCount int clientID string config SubscriberConfig } // NewSubscriber creates a new Subscriber. func NewSubscriber(config SubscriberConfig, adapter logger.Logger) (*Subscriber, error) { clientOptions := pulsar.ClientOptions{ URL: config.URL, Logger: logger2.NewPulsarLoggerAdapter(adapter), } // Configure TLS/mTLS if err := configureTLSForClient(&clientOptions, config, adapter); err != nil { return nil, errors.Join(err, errors.New("failed to configure TLS")) } conn, err := pulsar.NewClient(clientOptions) if err != nil { return nil, errors.Join(err, errors.New("cannot connect to Pulsar")) } return NewSubscriberWithPulsarClient(conn, config, adapter) } // NewSubscriberWithPulsarClient creates a new Subscriber with the provided pulsar client. func NewSubscriberWithPulsarClient( conn pulsar.Client, config SubscriberConfig, logger logger.Logger, ) (*Subscriber, error) { return &Subscriber{ conn: conn, logger: logger, closing: make(chan struct{}), clientID: watermill.NewULID(), subs: make(map[string]pulsar.Consumer), config: config, }, nil } // Subscribe subscribes messages from Pulsar. func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { output := make(chan *message.Message) s.subsLock.Lock() subName, ok := ctx.Value(SubNameKey).(string) if !ok { subName = SubNameDefault } index, ok := ctx.Value(IndexKey).(int) if !ok { index = 0 } receiverQueueSize, ok := ctx.Value(ReceiverQueueSizeKey).(int) if !ok { receiverQueueSize = ReceiverQueueSizeDefault } subscriptionName := fmt.Sprintf("%s-%s", topic, s.clientID) if s.config.SubscriberName != "" { subscriptionName = s.config.SubscriberName } sn := fmt.Sprintf("%s_%s", subscriptionName, subName) n := fmt.Sprintf("%s_%d", sn, index) sub, found := s.subs[n] if !found { subscribeCtx, cancel := context.WithTimeout(ctx, TimeOutDefault) defer cancel() done := make(chan struct{}) var sb pulsar.Consumer var err error go func() { defer close(done) sb, err = s.conn.Subscribe(pulsar.ConsumerOptions{ Topic: topic, Name: n, SubscriptionName: sn, Type: s.config.SubscriberType, MessageChannel: make(chan pulsar.ConsumerMessage, defaultMessageChannelSize), ReceiverQueueSize: receiverQueueSize, }) }() select { case <-subscribeCtx.Done(): s.subsLock.Unlock() return nil, fmt.Errorf("subscription timeout: %w", subscribeCtx.Err()) case <-done: if err != nil { s.subsLock.Unlock() return nil, fmt.Errorf("subscription failed: %w", err) } } s.subs[n] = sb sub = sb } s.subsLock.Unlock() // 创建本地引用以避免竞态条件 localSub := sub go func() { for { select { case <-s.closing: s.logger.InfoContext(ctx, "subscriber is closing") return case <-ctx.Done(): s.logger.InfoContext(ctx, "exiting on context closure") return case m, msgOk := <-localSub.Chan(): if !msgOk { // Channel closed, exit the loop s.logger.InfoContext(ctx, "consumer channel closed") return } go s.processMessage(ctx, output, m, localSub) } } }() return output, nil } func (s *Subscriber) processMessage( ctx context.Context, output chan *message.Message, m pulsar.Message, sub pulsar.Consumer, ) { if s.isClosed() { return } s.logger.DebugContext(ctx, "Received message", "key", m.Key()) ctx, cancelCtx := context.WithCancel(ctx) defer cancelCtx() msg := message.NewMessage(m.Key(), m.Payload()) select { case <-s.closing: s.logger.DebugContext(ctx, "Closing, message discarded", "key", m.Key()) return case <-ctx.Done(): s.logger.DebugContext(ctx, "Context cancelled, message discarded") return // if this is first can risk 'send on closed channel' errors case output <- msg: s.logger.DebugContext(ctx, "Message sent to consumer") } select { case <-msg.Acked(): err := sub.Ack(m) if err != nil { s.logger.DebugContext(ctx, "Message Ack Failed") } s.logger.DebugContext(ctx, "Message Acked") case <-msg.Nacked(): sub.Nack(m) s.logger.DebugContext(ctx, "Message Nacked") case <-s.closing: s.logger.DebugContext(ctx, "Closing, message discarded before ack") return case <-ctx.Done(): s.logger.DebugContext(ctx, "Context cancelled, message discarded before ack") return } } // Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete. func (s *Subscriber) Close() error { s.subsLock.Lock() defer s.subsLock.Unlock() if s.closed { return nil } s.closed = true s.logger.DebugContext(context.Background(), "Closing subscriber") defer s.logger.InfoContext(context.Background(), "Subscriber closed") close(s.closing) for _, sub := range s.subs { sub.Close() } s.conn.Close() return nil } func (s *Subscriber) isClosed() bool { s.subsLock.RLock() defer s.subsLock.RUnlock() return s.closed }