package watermark import ( "database/sql" "github.com/ThreeDotsLabs/watermill" watersql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/go-kratos/kratos/v2/log" ) type CQRSBus struct { CommandBus *cqrs.CommandBus EventBus *cqrs.EventBus Router *message.Router } func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) { helper := log.NewHelper(logger) wmLogger := watermill.NewStdLogger(false, false) publisher, err := watersql.NewPublisher(db, watersql.PublisherConfig{ SchemaAdapter: watersql.DefaultPostgreSQLSchema{}, AutoInitializeSchema: true, }, wmLogger) if err != nil { return nil, err } subscriber, err := watersql.NewSubscriber(db, watersql.SubscriberConfig{ SchemaAdapter: watersql.DefaultPostgreSQLSchema{}, OffsetsAdapter: watersql.DefaultPostgreSQLOffsetsAdapter{}, InitializeSchema: true, }, wmLogger) if err != nil { return nil, err } // subscriber will be used later to wire event handlers on the router _ = subscriber router, err := message.NewRouter(message.RouterConfig{}, wmLogger) if err != nil { return nil, err } cqrsMarshaler := cqrs.JSONMarshaler{} commandBus, err := cqrs.NewCommandBusWithConfig(publisher, cqrs.CommandBusConfig{ GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { return "commands." + params.CommandName, nil }, Marshaler: cqrsMarshaler, Logger: wmLogger, }) if err != nil { return nil, err } eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{ GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { return "events." + params.EventName, nil }, Marshaler: cqrsMarshaler, Logger: wmLogger, }) if err != nil { return nil, err } helper.Info("Watermill CQRS bus initialized with PostgreSQL") return &CQRSBus{ CommandBus: commandBus, EventBus: eventBus, Router: router, }, nil }