feat: wire Watermill CQRS — EventBus in usecases, event handlers, router lifecycle

- Add EventPublisher interface in biz layer for domain event publishing
- Wire EventBusPublisher (Watermill EventBus adapter) into FileUsecase, FolderUsecase, ShareUsecase
- Publish events after UploadFile, DeleteFile, CreateFolder, DeleteFolder, CreateShare
- Implement CQRSHandler with logging event handlers for all 6 event types
- Register event handlers via CQRSBus.RegisterHandlers using Watermill EventProcessor
- Store subscriber and wmLogger in CQRSBus for EventProcessor wiring
- Expose SqlDB() on Data struct for Watermill SQL pub/sub
- Start Watermill router in goroutine alongside Kratos app with graceful close
- Use appContext wrapper struct to pass CQRSBus through Wire DI graph
This commit is contained in:
向宁 2026-05-25 13:52:05 +08:00
parent 3eb1a1839d
commit 11315fd00b
11 changed files with 278 additions and 23 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"flag"
"os"
@ -42,13 +43,23 @@ func main() {
panic(err)
}
app, cleanup, err := initApp(&bc, logger)
ctx, cleanup, err := initApp(&bc, logger)
if err != nil {
panic(err)
}
defer cleanup()
if err := app.Run(); err != nil {
// Start Watermill router alongside the Kratos app.
if ctx.CQRSBus != nil {
go func() {
if err := ctx.CQRSBus.Router.Run(context.Background()); err != nil {
logger.Log(log.LevelError, "msg", "watermill router error", "error", err)
}
}()
defer ctx.CQRSBus.Router.Close()
}
if err := ctx.App.Run(); err != nil {
panic(err)
}
}

View File

@ -3,11 +3,14 @@
package main
import (
"database/sql"
"rag/file-system/internal/biz"
"rag/file-system/internal/conf"
"rag/file-system/internal/data"
"rag/file-system/internal/server"
"rag/file-system/internal/service"
"rag/file-system/internal/watermark"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log"
@ -16,6 +19,12 @@ import (
"github.com/google/wire"
)
// appContext holds the Kratos app and CQRS bus together for Wire.
type appContext struct {
App *kratos.App
CQRSBus *watermark.CQRSBus
}
// newApp creates a new Kratos application with HTTP and gRPC servers.
func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App {
return kratos.New(
@ -25,6 +34,11 @@ func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App {
)
}
// newAppContext wraps the app and CQRSBus into an appContext.
func newAppContext(app *kratos.App, bus *watermark.CQRSBus) *appContext {
return &appContext{App: app, CQRSBus: bus}
}
// newConfServer extracts the Server config from Bootstrap.
func newConfServer(bc *conf.Bootstrap) *conf.Server {
return bc.GetServer()
@ -40,8 +54,13 @@ func newConfAuth(bc *conf.Bootstrap) *conf.Auth {
return bc.GetAuth()
}
// newSQLDB extracts the underlying *sql.DB from Data for Watermill.
func newSQLDB(d *data.Data) (*sql.DB, error) {
return d.SqlDB()
}
// initApp wires up the entire dependency graph.
func initApp(*conf.Bootstrap, log.Logger) (*kratos.App, func(), error) {
func initApp(*conf.Bootstrap, log.Logger) (*appContext, func(), error) {
panic(wire.Build(
// Config extraction
newConfServer,
@ -53,15 +72,23 @@ func initApp(*conf.Bootstrap, log.Logger) (*kratos.App, func(), error) {
biz.ProviderSet,
service.ProviderSet,
server.ProviderSet,
watermark.ProviderSet,
// Interface bindings: biz interfaces → data implementations
// Extract *sql.DB from Data for Watermill
newSQLDB,
// Interface bindings: biz interfaces -> data implementations
wire.Bind(new(biz.FileRepo), new(*data.FileRepo)),
wire.Bind(new(biz.FolderRepo), new(*data.FolderRepo)),
wire.Bind(new(biz.FileMetaRepo), new(*data.FileMetaRepo)),
wire.Bind(new(biz.FileMetaRepoForShare), new(*data.FileMetaRepo)),
wire.Bind(new(biz.ShareRepo), new(*data.ShareRepo)),
// App constructor
// Interface binding: biz EventPublisher -> watermark EventBusPublisher
wire.Bind(new(biz.EventPublisher), new(*watermark.EventBusPublisher)),
// Wire up app + CQRSBus into appContext
newAppContext,
newApp,
))
}

View File

@ -7,6 +7,7 @@
package main
import (
"database/sql"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/grpc"
@ -16,43 +17,68 @@ import (
"rag/file-system/internal/data"
"rag/file-system/internal/server"
"rag/file-system/internal/service"
"rag/file-system/internal/watermark"
)
// Injectors from wire.go:
// initApp wires up the entire dependency graph.
func initApp(bootstrap *conf.Bootstrap, logger log.Logger) (*kratos.App, func(), error) {
func initApp(bootstrap *conf.Bootstrap, logger log.Logger) (*appContext, func(), error) {
confServer := newConfServer(bootstrap)
auth := newConfAuth(bootstrap)
confData := newConfData(bootstrap)
fileRepo := data.NewFileRepo(confData)
fileUsecase := biz.NewFileUsecase(fileRepo, logger)
bucketUsecase := biz.NewBucketUsecase(fileRepo, logger)
dataData, cleanup, err := data.NewData(confData, logger)
if err != nil {
return nil, nil, err
}
db, err := newSQLDB(dataData)
if err != nil {
cleanup()
return nil, nil, err
}
cqrsHandler := watermark.NewCQRSHandler(logger)
cqrsBus, err := watermark.NewCQRSBusWithHandlers(db, cqrsHandler, logger)
if err != nil {
cleanup()
return nil, nil, err
}
eventBusPublisher := watermark.NewEventBusPublisher(cqrsBus)
fileUsecase := biz.NewFileUsecase(fileRepo, eventBusPublisher, logger)
bucketUsecase := biz.NewBucketUsecase(fileRepo, logger)
folderRepo := data.NewFolderRepo(dataData, logger)
fileMetaRepo := data.NewFileMetaRepo(dataData, logger)
folderUsecase := biz.NewFolderUsecase(folderRepo, fileMetaRepo, fileRepo, logger)
folderUsecase := biz.NewFolderUsecase(folderRepo, fileMetaRepo, fileRepo, eventBusPublisher, logger)
shareRepo := data.NewShareRepo(dataData, logger)
shareUsecase := biz.NewShareUsecase(shareRepo, fileMetaRepo, fileRepo, logger)
shareUsecase := biz.NewShareUsecase(shareRepo, fileMetaRepo, fileRepo, eventBusPublisher, logger)
fileService := service.NewFileService(fileUsecase, bucketUsecase, folderUsecase, shareUsecase, logger)
httpServer := server.NewHTTPServer(confServer, auth, fileService, logger)
grpcServer := server.NewGRPCServer(confServer, auth, fileService, logger)
app := newApp(logger, httpServer, grpcServer)
return app, func() {
mainAppContext := newAppContext(app, cqrsBus)
return mainAppContext, func() {
cleanup()
}, nil
}
// wire.go:
// appContext holds the Kratos app and CQRS bus together for Wire.
type appContext struct {
App *kratos.App
CQRSBus *watermark.CQRSBus
}
// newApp creates a new Kratos application with HTTP and gRPC servers.
func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App {
return kratos.New(kratos.Name("file-system"), kratos.Logger(logger), kratos.Server(hs, gs))
}
// newAppContext wraps the app and CQRSBus into an appContext.
func newAppContext(app *kratos.App, bus *watermark.CQRSBus) *appContext {
return &appContext{App: app, CQRSBus: bus}
}
// newConfServer extracts the Server config from Bootstrap.
func newConfServer(bc *conf.Bootstrap) *conf.Server {
return bc.GetServer()
@ -67,3 +93,8 @@ func newConfData(bc *conf.Bootstrap) *conf.Data {
func newConfAuth(bc *conf.Bootstrap) *conf.Auth {
return bc.GetAuth()
}
// newSQLDB extracts the underlying *sql.DB from Data for Watermill.
func newSQLDB(d *data.Data) (*sql.DB, error) {
return d.SqlDB()
}

8
internal/biz/event.go Normal file
View File

@ -0,0 +1,8 @@
package biz
import "context"
// EventPublisher defines the interface for publishing domain events.
type EventPublisher interface {
Publish(ctx context.Context, event interface{}) error
}

View File

@ -6,6 +6,7 @@ import (
"time"
"rag/file-system/internal/data"
"rag/file-system/internal/watermark"
"github.com/go-kratos/kratos/v2/log"
)
@ -29,21 +30,33 @@ type FileRepo interface {
// FileUsecase wraps FileRepo and provides file-level business operations.
type FileUsecase struct {
repo FileRepo
log *log.Helper
repo FileRepo
eventPub EventPublisher
log *log.Helper
}
// NewFileUsecase creates a new FileUsecase.
func NewFileUsecase(repo FileRepo, logger log.Logger) *FileUsecase {
func NewFileUsecase(repo FileRepo, eventPub EventPublisher, logger log.Logger) *FileUsecase {
return &FileUsecase{
repo: repo,
log: log.NewHelper(logger),
repo: repo,
eventPub: eventPub,
log: log.NewHelper(logger),
}
}
// UploadFile uploads data to the specified bucket and key.
func (uc *FileUsecase) UploadFile(ctx context.Context, bucket, key string, fileData io.Reader) error {
return uc.repo.UploadFile(ctx, bucket, key, fileData)
if err := uc.repo.UploadFile(ctx, bucket, key, fileData); err != nil {
return err
}
// Publish domain event on success.
if err := uc.eventPub.Publish(ctx, &watermark.FileUploadedEvent{
BucketName: bucket,
ObjectKey: key,
}); err != nil {
uc.log.Errorf("failed to publish FileUploadedEvent: %v", err)
}
return nil
}
// DownloadFile downloads an object from S3.
@ -58,7 +71,17 @@ func (uc *FileUsecase) GetFileContent(ctx context.Context, bucket, key string) (
// DeleteFile removes a file from S3.
func (uc *FileUsecase) DeleteFile(ctx context.Context, bucket, key string) error {
return uc.repo.DeleteFile(ctx, bucket, key)
if err := uc.repo.DeleteFile(ctx, bucket, key); err != nil {
return err
}
// Publish domain event on success.
if err := uc.eventPub.Publish(ctx, &watermark.FileDeletedEvent{
BucketName: bucket,
ObjectKey: key,
}); err != nil {
uc.log.Errorf("failed to publish FileDeletedEvent: %v", err)
}
return nil
}
// ListObjectsV2 lists files with pagination support.

View File

@ -8,6 +8,7 @@ import (
"rag/file-system/internal/data"
"rag/file-system/internal/pkg/sanitize"
"rag/file-system/internal/watermark"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/uuid"
@ -38,15 +39,17 @@ type FolderUsecase struct {
folderRepo FolderRepo
fileRepo FileMetaRepo
s3Repo FileRepo
eventPub EventPublisher
log *log.Helper
}
// NewFolderUsecase creates a new FolderUsecase.
func NewFolderUsecase(folderRepo FolderRepo, fileRepo FileMetaRepo, s3Repo FileRepo, logger log.Logger) *FolderUsecase {
func NewFolderUsecase(folderRepo FolderRepo, fileRepo FileMetaRepo, s3Repo FileRepo, eventPub EventPublisher, logger log.Logger) *FolderUsecase {
return &FolderUsecase{
folderRepo: folderRepo,
fileRepo: fileRepo,
s3Repo: s3Repo,
eventPub: eventPub,
log: log.NewHelper(logger),
}
}
@ -61,6 +64,14 @@ func (uc *FolderUsecase) CreateFolder(ctx context.Context, parentID *string, nam
if err := uc.folderRepo.Create(ctx, folder); err != nil {
return nil, fmt.Errorf("failed to create folder: %w", err)
}
// Publish domain event on success.
if err := uc.eventPub.Publish(ctx, &watermark.FolderCreatedEvent{
FolderID: folder.ID,
Name: folder.Name,
OwnerID: folder.OwnerID,
}); err != nil {
uc.log.Errorf("failed to publish FolderCreatedEvent: %v", err)
}
return folder, nil
}
@ -118,6 +129,15 @@ func (uc *FolderUsecase) DeleteFolder(ctx context.Context, id, ownerID string) e
if err := uc.folderRepo.Delete(ctx, id, ownerID); err != nil {
return fmt.Errorf("failed to delete folder: %w", err)
}
// Publish domain event on success.
if err := uc.eventPub.Publish(ctx, &watermark.FolderDeletedEvent{
FolderID: id,
OwnerID: ownerID,
}); err != nil {
uc.log.Errorf("failed to publish FolderDeletedEvent: %v", err)
}
return nil
}

View File

@ -8,6 +8,7 @@ import (
"time"
"rag/file-system/internal/data"
"rag/file-system/internal/watermark"
"github.com/go-kratos/kratos/v2/log"
)
@ -32,15 +33,17 @@ type ShareUsecase struct {
shareRepo ShareRepo
fileRepo FileMetaRepoForShare
s3Repo FileRepo
eventPub EventPublisher
log *log.Helper
}
// NewShareUsecase creates a new ShareUsecase.
func NewShareUsecase(shareRepo ShareRepo, fileRepo FileMetaRepoForShare, s3Repo FileRepo, logger log.Logger) *ShareUsecase {
func NewShareUsecase(shareRepo ShareRepo, fileRepo FileMetaRepoForShare, s3Repo FileRepo, eventPub EventPublisher, logger log.Logger) *ShareUsecase {
return &ShareUsecase{
shareRepo: shareRepo,
fileRepo: fileRepo,
s3Repo: s3Repo,
eventPub: eventPub,
log: log.NewHelper(logger),
}
}
@ -68,6 +71,18 @@ func (uc *ShareUsecase) CreateShare(ctx context.Context, resourceType, resourceI
if err := uc.shareRepo.Create(ctx, share); err != nil {
return nil, fmt.Errorf("failed to create share link: %w", err)
}
// Publish domain event on success.
if err := uc.eventPub.Publish(ctx, &watermark.ShareCreatedEvent{
ShareID: share.ID,
ResourceType: share.ResourceType,
ResourceID: share.ResourceID,
Token: share.Token,
CreatedBy: share.CreatedBy,
}); err != nil {
uc.log.Errorf("failed to publish ShareCreatedEvent: %v", err)
}
return share, nil
}

View File

@ -2,6 +2,7 @@ package data
import (
"context"
"database/sql"
"time"
"rag/file-system/internal/conf"
@ -47,6 +48,11 @@ func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) {
return &Data{db: db, log: helper}, cleanup, nil
}
// SqlDB returns the underlying *sql.DB for use by Watermill.
func (d *Data) SqlDB() (*sql.DB, error) {
return d.db.DB()
}
// DB returns the *gorm.DB for the given context. If a transaction is active
// in the context, it returns the transaction DB; otherwise the global DB.
func (d *Data) DB(ctx context.Context) *gorm.DB {

View File

@ -8,14 +8,19 @@ import (
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/wire"
)
// CQRSBus holds the Watermill CQRS buses, router, and internal references.
type CQRSBus struct {
CommandBus *cqrs.CommandBus
EventBus *cqrs.EventBus
Router *message.Router
subscriber message.Subscriber
wmLogger watermill.LoggerAdapter
}
// NewCQRSBus creates a new CQRSBus with PostgreSQL-backed pub/sub.
func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) {
helper := log.NewHelper(logger)
wmLogger := watermill.NewStdLogger(false, false)
@ -37,9 +42,6 @@ func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) {
return nil, err
}
// subscriber will be used later to wire event handlers on the router
_ = subscriber
router, err := message.NewRouter(message.RouterConfig{}, wmLogger)
if err != nil {
return nil, err
@ -74,5 +76,56 @@ func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) {
CommandBus: commandBus,
EventBus: eventBus,
Router: router,
subscriber: subscriber,
wmLogger: wmLogger,
}, nil
}
// RegisterHandlers sets up all event handlers on the router.
func (b *CQRSBus) RegisterHandlers(handler *CQRSHandler) error {
eventProcessor, err := cqrs.NewEventProcessorWithConfig(
b.Router,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return "events." + params.EventName, nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return b.subscriber, nil
},
Marshaler: cqrs.JSONMarshaler{},
Logger: b.wmLogger,
},
)
if err != nil {
return err
}
err = eventProcessor.AddHandlers(
cqrs.NewEventHandler("OnFileUploaded", handler.OnFileUploaded),
cqrs.NewEventHandler("OnFileDeleted", handler.OnFileDeleted),
cqrs.NewEventHandler("OnFolderCreated", handler.OnFolderCreated),
cqrs.NewEventHandler("OnFolderDeleted", handler.OnFolderDeleted),
cqrs.NewEventHandler("OnShareCreated", handler.OnShareCreated),
cqrs.NewEventHandler("OnShareDownloaded", handler.OnShareDownloaded),
)
if err != nil {
return err
}
return nil
}
// ProviderSet is the Wire provider set for the watermark layer.
var ProviderSet = wire.NewSet(NewCQRSBusWithHandlers, NewCQRSHandler, NewEventBusPublisher)
// NewCQRSBusWithHandlers creates the CQRSBus and registers all event handlers.
func NewCQRSBusWithHandlers(db *sql.DB, handler *CQRSHandler, logger log.Logger) (*CQRSBus, error) {
bus, err := NewCQRSBus(db, logger)
if err != nil {
return nil, err
}
if err := bus.RegisterHandlers(handler); err != nil {
return nil, err
}
return bus, nil
}

View File

@ -1,13 +1,54 @@
package watermark
import (
"context"
"github.com/go-kratos/kratos/v2/log"
)
// CQRSHandler holds all Watermill event handlers.
type CQRSHandler struct {
log *log.Helper
}
// NewCQRSHandler creates a new CQRSHandler.
func NewCQRSHandler(logger log.Logger) *CQRSHandler {
return &CQRSHandler{log: log.NewHelper(logger)}
}
// OnFileUploaded handles the FileUploadedEvent.
func (h *CQRSHandler) OnFileUploaded(ctx context.Context, event *FileUploadedEvent) error {
h.log.Infof("event: file uploaded, bucket=%s key=%s size=%d", event.BucketName, event.ObjectKey, event.Size)
return nil
}
// OnFileDeleted handles the FileDeletedEvent.
func (h *CQRSHandler) OnFileDeleted(ctx context.Context, event *FileDeletedEvent) error {
h.log.Infof("event: file deleted, bucket=%s key=%s", event.BucketName, event.ObjectKey)
return nil
}
// OnFolderCreated handles the FolderCreatedEvent.
func (h *CQRSHandler) OnFolderCreated(ctx context.Context, event *FolderCreatedEvent) error {
h.log.Infof("event: folder created, id=%s name=%s owner=%s", event.FolderID, event.Name, event.OwnerID)
return nil
}
// OnFolderDeleted handles the FolderDeletedEvent.
func (h *CQRSHandler) OnFolderDeleted(ctx context.Context, event *FolderDeletedEvent) error {
h.log.Infof("event: folder deleted, id=%s owner=%s", event.FolderID, event.OwnerID)
return nil
}
// OnShareCreated handles the ShareCreatedEvent.
func (h *CQRSHandler) OnShareCreated(ctx context.Context, event *ShareCreatedEvent) error {
h.log.Infof("event: share created, id=%s token=%s resource=%s/%s by=%s",
event.ShareID, event.Token, event.ResourceType, event.ResourceID, event.CreatedBy)
return nil
}
// OnShareDownloaded handles the ShareDownloadedEvent.
func (h *CQRSHandler) OnShareDownloaded(ctx context.Context, event *ShareDownloadedEvent) error {
h.log.Infof("event: share downloaded, token=%s", event.Token)
return nil
}

View File

@ -0,0 +1,20 @@
package watermark
import (
"context"
)
// EventBusPublisher implements biz.EventPublisher using Watermill EventBus.
type EventBusPublisher struct {
bus *CQRSBus
}
// NewEventBusPublisher creates a new EventBusPublisher from a CQRSBus.
func NewEventBusPublisher(bus *CQRSBus) *EventBusPublisher {
return &EventBusPublisher{bus: bus}
}
// Publish publishes a domain event via the Watermill EventBus.
func (p *EventBusPublisher) Publish(ctx context.Context, event interface{}) error {
return p.bus.EventBus.Publish(ctx, event)
}