package watermark import ( "database/sql" "github.com/ThreeDotsLabs/watermill" watersql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/go-kratos/kratos/v2/log" "github.com/google/wire" ) // CQRSBus holds the Watermill CQRS buses, router, and internal references. type CQRSBus struct { CommandBus *cqrs.CommandBus EventBus *cqrs.EventBus Router *message.Router subscriber message.Subscriber wmLogger watermill.LoggerAdapter } // NewCQRSBus creates a new CQRSBus with PostgreSQL-backed pub/sub. func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) { helper := log.NewHelper(logger) wmLogger := watermill.NewStdLogger(false, false) publisher, err := watersql.NewPublisher(db, watersql.PublisherConfig{ SchemaAdapter: watersql.DefaultPostgreSQLSchema{}, AutoInitializeSchema: true, }, wmLogger) if err != nil { return nil, err } subscriber, err := watersql.NewSubscriber(db, watersql.SubscriberConfig{ SchemaAdapter: watersql.DefaultPostgreSQLSchema{}, OffsetsAdapter: watersql.DefaultPostgreSQLOffsetsAdapter{}, InitializeSchema: true, }, wmLogger) if err != nil { return nil, err } router, err := message.NewRouter(message.RouterConfig{}, wmLogger) if err != nil { return nil, err } cqrsMarshaler := cqrs.JSONMarshaler{} commandBus, err := cqrs.NewCommandBusWithConfig(publisher, cqrs.CommandBusConfig{ GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { return "commands." + params.CommandName, nil }, Marshaler: cqrsMarshaler, Logger: wmLogger, }) if err != nil { return nil, err } eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{ GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { return "events." + params.EventName, nil }, Marshaler: cqrsMarshaler, Logger: wmLogger, }) if err != nil { return nil, err } helper.Info("Watermill CQRS bus initialized with PostgreSQL") return &CQRSBus{ CommandBus: commandBus, EventBus: eventBus, Router: router, subscriber: subscriber, wmLogger: wmLogger, }, nil } // RegisterHandlers sets up all event handlers on the router. func (b *CQRSBus) RegisterHandlers(handler *CQRSHandler) error { eventProcessor, err := cqrs.NewEventProcessorWithConfig( b.Router, cqrs.EventProcessorConfig{ GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { return "events." + params.EventName, nil }, SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { return b.subscriber, nil }, Marshaler: cqrs.JSONMarshaler{}, Logger: b.wmLogger, }, ) if err != nil { return err } err = eventProcessor.AddHandlers( cqrs.NewEventHandler("OnFileUploaded", handler.OnFileUploaded), cqrs.NewEventHandler("OnFileDeleted", handler.OnFileDeleted), cqrs.NewEventHandler("OnFolderCreated", handler.OnFolderCreated), cqrs.NewEventHandler("OnFolderDeleted", handler.OnFolderDeleted), cqrs.NewEventHandler("OnShareCreated", handler.OnShareCreated), cqrs.NewEventHandler("OnShareDownloaded", handler.OnShareDownloaded), ) if err != nil { return err } return nil } // ProviderSet is the Wire provider set for the watermark layer. var ProviderSet = wire.NewSet(NewCQRSBusWithHandlers, NewCQRSHandler, NewEventBusPublisher) // NewCQRSBusWithHandlers creates the CQRSBus and registers all event handlers. func NewCQRSBusWithHandlers(db *sql.DB, handler *CQRSHandler, logger log.Logger) (*CQRSBus, error) { bus, err := NewCQRSBus(db, logger) if err != nil { return nil, err } if err := bus.RegisterHandlers(handler); err != nil { return nil, err } return bus, nil }