diff --git a/cmd/server/main.go b/cmd/server/main.go index 98f2ca0..c098531 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "os" @@ -42,13 +43,23 @@ func main() { panic(err) } - app, cleanup, err := initApp(&bc, logger) + ctx, cleanup, err := initApp(&bc, logger) if err != nil { panic(err) } defer cleanup() - if err := app.Run(); err != nil { + // Start Watermill router alongside the Kratos app. + if ctx.CQRSBus != nil { + go func() { + if err := ctx.CQRSBus.Router.Run(context.Background()); err != nil { + logger.Log(log.LevelError, "msg", "watermill router error", "error", err) + } + }() + defer ctx.CQRSBus.Router.Close() + } + + if err := ctx.App.Run(); err != nil { panic(err) } } diff --git a/cmd/server/wire.go b/cmd/server/wire.go index 2d37806..30360a6 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -3,11 +3,14 @@ package main import ( + "database/sql" + "rag/file-system/internal/biz" "rag/file-system/internal/conf" "rag/file-system/internal/data" "rag/file-system/internal/server" "rag/file-system/internal/service" + "rag/file-system/internal/watermark" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" @@ -16,6 +19,12 @@ import ( "github.com/google/wire" ) +// appContext holds the Kratos app and CQRS bus together for Wire. +type appContext struct { + App *kratos.App + CQRSBus *watermark.CQRSBus +} + // newApp creates a new Kratos application with HTTP and gRPC servers. func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App { return kratos.New( @@ -25,6 +34,11 @@ func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App { ) } +// newAppContext wraps the app and CQRSBus into an appContext. +func newAppContext(app *kratos.App, bus *watermark.CQRSBus) *appContext { + return &appContext{App: app, CQRSBus: bus} +} + // newConfServer extracts the Server config from Bootstrap. func newConfServer(bc *conf.Bootstrap) *conf.Server { return bc.GetServer() @@ -40,8 +54,13 @@ func newConfAuth(bc *conf.Bootstrap) *conf.Auth { return bc.GetAuth() } +// newSQLDB extracts the underlying *sql.DB from Data for Watermill. +func newSQLDB(d *data.Data) (*sql.DB, error) { + return d.SqlDB() +} + // initApp wires up the entire dependency graph. -func initApp(*conf.Bootstrap, log.Logger) (*kratos.App, func(), error) { +func initApp(*conf.Bootstrap, log.Logger) (*appContext, func(), error) { panic(wire.Build( // Config extraction newConfServer, @@ -53,15 +72,23 @@ func initApp(*conf.Bootstrap, log.Logger) (*kratos.App, func(), error) { biz.ProviderSet, service.ProviderSet, server.ProviderSet, + watermark.ProviderSet, - // Interface bindings: biz interfaces → data implementations + // Extract *sql.DB from Data for Watermill + newSQLDB, + + // Interface bindings: biz interfaces -> data implementations wire.Bind(new(biz.FileRepo), new(*data.FileRepo)), wire.Bind(new(biz.FolderRepo), new(*data.FolderRepo)), wire.Bind(new(biz.FileMetaRepo), new(*data.FileMetaRepo)), wire.Bind(new(biz.FileMetaRepoForShare), new(*data.FileMetaRepo)), wire.Bind(new(biz.ShareRepo), new(*data.ShareRepo)), - // App constructor + // Interface binding: biz EventPublisher -> watermark EventBusPublisher + wire.Bind(new(biz.EventPublisher), new(*watermark.EventBusPublisher)), + + // Wire up app + CQRSBus into appContext + newAppContext, newApp, )) } diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index af6056a..890e9f1 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -7,6 +7,7 @@ package main import ( + "database/sql" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/grpc" @@ -16,43 +17,68 @@ import ( "rag/file-system/internal/data" "rag/file-system/internal/server" "rag/file-system/internal/service" + "rag/file-system/internal/watermark" ) // Injectors from wire.go: // initApp wires up the entire dependency graph. -func initApp(bootstrap *conf.Bootstrap, logger log.Logger) (*kratos.App, func(), error) { +func initApp(bootstrap *conf.Bootstrap, logger log.Logger) (*appContext, func(), error) { confServer := newConfServer(bootstrap) auth := newConfAuth(bootstrap) confData := newConfData(bootstrap) fileRepo := data.NewFileRepo(confData) - fileUsecase := biz.NewFileUsecase(fileRepo, logger) - bucketUsecase := biz.NewBucketUsecase(fileRepo, logger) dataData, cleanup, err := data.NewData(confData, logger) if err != nil { return nil, nil, err } + db, err := newSQLDB(dataData) + if err != nil { + cleanup() + return nil, nil, err + } + cqrsHandler := watermark.NewCQRSHandler(logger) + cqrsBus, err := watermark.NewCQRSBusWithHandlers(db, cqrsHandler, logger) + if err != nil { + cleanup() + return nil, nil, err + } + eventBusPublisher := watermark.NewEventBusPublisher(cqrsBus) + fileUsecase := biz.NewFileUsecase(fileRepo, eventBusPublisher, logger) + bucketUsecase := biz.NewBucketUsecase(fileRepo, logger) folderRepo := data.NewFolderRepo(dataData, logger) fileMetaRepo := data.NewFileMetaRepo(dataData, logger) - folderUsecase := biz.NewFolderUsecase(folderRepo, fileMetaRepo, fileRepo, logger) + folderUsecase := biz.NewFolderUsecase(folderRepo, fileMetaRepo, fileRepo, eventBusPublisher, logger) shareRepo := data.NewShareRepo(dataData, logger) - shareUsecase := biz.NewShareUsecase(shareRepo, fileMetaRepo, fileRepo, logger) + shareUsecase := biz.NewShareUsecase(shareRepo, fileMetaRepo, fileRepo, eventBusPublisher, logger) fileService := service.NewFileService(fileUsecase, bucketUsecase, folderUsecase, shareUsecase, logger) httpServer := server.NewHTTPServer(confServer, auth, fileService, logger) grpcServer := server.NewGRPCServer(confServer, auth, fileService, logger) app := newApp(logger, httpServer, grpcServer) - return app, func() { + mainAppContext := newAppContext(app, cqrsBus) + return mainAppContext, func() { cleanup() }, nil } // wire.go: +// appContext holds the Kratos app and CQRS bus together for Wire. +type appContext struct { + App *kratos.App + CQRSBus *watermark.CQRSBus +} + // newApp creates a new Kratos application with HTTP and gRPC servers. func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App { return kratos.New(kratos.Name("file-system"), kratos.Logger(logger), kratos.Server(hs, gs)) } +// newAppContext wraps the app and CQRSBus into an appContext. +func newAppContext(app *kratos.App, bus *watermark.CQRSBus) *appContext { + return &appContext{App: app, CQRSBus: bus} +} + // newConfServer extracts the Server config from Bootstrap. func newConfServer(bc *conf.Bootstrap) *conf.Server { return bc.GetServer() @@ -67,3 +93,8 @@ func newConfData(bc *conf.Bootstrap) *conf.Data { func newConfAuth(bc *conf.Bootstrap) *conf.Auth { return bc.GetAuth() } + +// newSQLDB extracts the underlying *sql.DB from Data for Watermill. +func newSQLDB(d *data.Data) (*sql.DB, error) { + return d.SqlDB() +} diff --git a/internal/biz/event.go b/internal/biz/event.go new file mode 100644 index 0000000..72ebb80 --- /dev/null +++ b/internal/biz/event.go @@ -0,0 +1,8 @@ +package biz + +import "context" + +// EventPublisher defines the interface for publishing domain events. +type EventPublisher interface { + Publish(ctx context.Context, event interface{}) error +} diff --git a/internal/biz/file.go b/internal/biz/file.go index d7f1412..3485d50 100644 --- a/internal/biz/file.go +++ b/internal/biz/file.go @@ -6,6 +6,7 @@ import ( "time" "rag/file-system/internal/data" + "rag/file-system/internal/watermark" "github.com/go-kratos/kratos/v2/log" ) @@ -29,21 +30,33 @@ type FileRepo interface { // FileUsecase wraps FileRepo and provides file-level business operations. type FileUsecase struct { - repo FileRepo - log *log.Helper + repo FileRepo + eventPub EventPublisher + log *log.Helper } // NewFileUsecase creates a new FileUsecase. -func NewFileUsecase(repo FileRepo, logger log.Logger) *FileUsecase { +func NewFileUsecase(repo FileRepo, eventPub EventPublisher, logger log.Logger) *FileUsecase { return &FileUsecase{ - repo: repo, - log: log.NewHelper(logger), + repo: repo, + eventPub: eventPub, + log: log.NewHelper(logger), } } // UploadFile uploads data to the specified bucket and key. func (uc *FileUsecase) UploadFile(ctx context.Context, bucket, key string, fileData io.Reader) error { - return uc.repo.UploadFile(ctx, bucket, key, fileData) + if err := uc.repo.UploadFile(ctx, bucket, key, fileData); err != nil { + return err + } + // Publish domain event on success. + if err := uc.eventPub.Publish(ctx, &watermark.FileUploadedEvent{ + BucketName: bucket, + ObjectKey: key, + }); err != nil { + uc.log.Errorf("failed to publish FileUploadedEvent: %v", err) + } + return nil } // DownloadFile downloads an object from S3. @@ -58,7 +71,17 @@ func (uc *FileUsecase) GetFileContent(ctx context.Context, bucket, key string) ( // DeleteFile removes a file from S3. func (uc *FileUsecase) DeleteFile(ctx context.Context, bucket, key string) error { - return uc.repo.DeleteFile(ctx, bucket, key) + if err := uc.repo.DeleteFile(ctx, bucket, key); err != nil { + return err + } + // Publish domain event on success. + if err := uc.eventPub.Publish(ctx, &watermark.FileDeletedEvent{ + BucketName: bucket, + ObjectKey: key, + }); err != nil { + uc.log.Errorf("failed to publish FileDeletedEvent: %v", err) + } + return nil } // ListObjectsV2 lists files with pagination support. diff --git a/internal/biz/folder.go b/internal/biz/folder.go index 1cb05d7..64a4dd7 100644 --- a/internal/biz/folder.go +++ b/internal/biz/folder.go @@ -8,6 +8,7 @@ import ( "rag/file-system/internal/data" "rag/file-system/internal/pkg/sanitize" + "rag/file-system/internal/watermark" "github.com/go-kratos/kratos/v2/log" "github.com/google/uuid" @@ -38,15 +39,17 @@ type FolderUsecase struct { folderRepo FolderRepo fileRepo FileMetaRepo s3Repo FileRepo + eventPub EventPublisher log *log.Helper } // NewFolderUsecase creates a new FolderUsecase. -func NewFolderUsecase(folderRepo FolderRepo, fileRepo FileMetaRepo, s3Repo FileRepo, logger log.Logger) *FolderUsecase { +func NewFolderUsecase(folderRepo FolderRepo, fileRepo FileMetaRepo, s3Repo FileRepo, eventPub EventPublisher, logger log.Logger) *FolderUsecase { return &FolderUsecase{ folderRepo: folderRepo, fileRepo: fileRepo, s3Repo: s3Repo, + eventPub: eventPub, log: log.NewHelper(logger), } } @@ -61,6 +64,14 @@ func (uc *FolderUsecase) CreateFolder(ctx context.Context, parentID *string, nam if err := uc.folderRepo.Create(ctx, folder); err != nil { return nil, fmt.Errorf("failed to create folder: %w", err) } + // Publish domain event on success. + if err := uc.eventPub.Publish(ctx, &watermark.FolderCreatedEvent{ + FolderID: folder.ID, + Name: folder.Name, + OwnerID: folder.OwnerID, + }); err != nil { + uc.log.Errorf("failed to publish FolderCreatedEvent: %v", err) + } return folder, nil } @@ -118,6 +129,15 @@ func (uc *FolderUsecase) DeleteFolder(ctx context.Context, id, ownerID string) e if err := uc.folderRepo.Delete(ctx, id, ownerID); err != nil { return fmt.Errorf("failed to delete folder: %w", err) } + + // Publish domain event on success. + if err := uc.eventPub.Publish(ctx, &watermark.FolderDeletedEvent{ + FolderID: id, + OwnerID: ownerID, + }); err != nil { + uc.log.Errorf("failed to publish FolderDeletedEvent: %v", err) + } + return nil } diff --git a/internal/biz/share.go b/internal/biz/share.go index 9aa80e9..c2117a5 100644 --- a/internal/biz/share.go +++ b/internal/biz/share.go @@ -8,6 +8,7 @@ import ( "time" "rag/file-system/internal/data" + "rag/file-system/internal/watermark" "github.com/go-kratos/kratos/v2/log" ) @@ -32,15 +33,17 @@ type ShareUsecase struct { shareRepo ShareRepo fileRepo FileMetaRepoForShare s3Repo FileRepo + eventPub EventPublisher log *log.Helper } // NewShareUsecase creates a new ShareUsecase. -func NewShareUsecase(shareRepo ShareRepo, fileRepo FileMetaRepoForShare, s3Repo FileRepo, logger log.Logger) *ShareUsecase { +func NewShareUsecase(shareRepo ShareRepo, fileRepo FileMetaRepoForShare, s3Repo FileRepo, eventPub EventPublisher, logger log.Logger) *ShareUsecase { return &ShareUsecase{ shareRepo: shareRepo, fileRepo: fileRepo, s3Repo: s3Repo, + eventPub: eventPub, log: log.NewHelper(logger), } } @@ -68,6 +71,18 @@ func (uc *ShareUsecase) CreateShare(ctx context.Context, resourceType, resourceI if err := uc.shareRepo.Create(ctx, share); err != nil { return nil, fmt.Errorf("failed to create share link: %w", err) } + + // Publish domain event on success. + if err := uc.eventPub.Publish(ctx, &watermark.ShareCreatedEvent{ + ShareID: share.ID, + ResourceType: share.ResourceType, + ResourceID: share.ResourceID, + Token: share.Token, + CreatedBy: share.CreatedBy, + }); err != nil { + uc.log.Errorf("failed to publish ShareCreatedEvent: %v", err) + } + return share, nil } diff --git a/internal/data/data.go b/internal/data/data.go index 8bb18f1..1769917 100644 --- a/internal/data/data.go +++ b/internal/data/data.go @@ -2,6 +2,7 @@ package data import ( "context" + "database/sql" "time" "rag/file-system/internal/conf" @@ -47,6 +48,11 @@ func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) { return &Data{db: db, log: helper}, cleanup, nil } +// SqlDB returns the underlying *sql.DB for use by Watermill. +func (d *Data) SqlDB() (*sql.DB, error) { + return d.db.DB() +} + // DB returns the *gorm.DB for the given context. If a transaction is active // in the context, it returns the transaction DB; otherwise the global DB. func (d *Data) DB(ctx context.Context) *gorm.DB { diff --git a/internal/watermark/cqrs_setup.go b/internal/watermark/cqrs_setup.go index cb532aa..9e3acb8 100644 --- a/internal/watermark/cqrs_setup.go +++ b/internal/watermark/cqrs_setup.go @@ -8,14 +8,19 @@ import ( "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/go-kratos/kratos/v2/log" + "github.com/google/wire" ) +// CQRSBus holds the Watermill CQRS buses, router, and internal references. type CQRSBus struct { CommandBus *cqrs.CommandBus EventBus *cqrs.EventBus Router *message.Router + subscriber message.Subscriber + wmLogger watermill.LoggerAdapter } +// NewCQRSBus creates a new CQRSBus with PostgreSQL-backed pub/sub. func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) { helper := log.NewHelper(logger) wmLogger := watermill.NewStdLogger(false, false) @@ -37,9 +42,6 @@ func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) { return nil, err } - // subscriber will be used later to wire event handlers on the router - _ = subscriber - router, err := message.NewRouter(message.RouterConfig{}, wmLogger) if err != nil { return nil, err @@ -74,5 +76,56 @@ func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) { CommandBus: commandBus, EventBus: eventBus, Router: router, + subscriber: subscriber, + wmLogger: wmLogger, }, nil } + +// RegisterHandlers sets up all event handlers on the router. +func (b *CQRSBus) RegisterHandlers(handler *CQRSHandler) error { + eventProcessor, err := cqrs.NewEventProcessorWithConfig( + b.Router, + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return "events." + params.EventName, nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return b.subscriber, nil + }, + Marshaler: cqrs.JSONMarshaler{}, + Logger: b.wmLogger, + }, + ) + if err != nil { + return err + } + + err = eventProcessor.AddHandlers( + cqrs.NewEventHandler("OnFileUploaded", handler.OnFileUploaded), + cqrs.NewEventHandler("OnFileDeleted", handler.OnFileDeleted), + cqrs.NewEventHandler("OnFolderCreated", handler.OnFolderCreated), + cqrs.NewEventHandler("OnFolderDeleted", handler.OnFolderDeleted), + cqrs.NewEventHandler("OnShareCreated", handler.OnShareCreated), + cqrs.NewEventHandler("OnShareDownloaded", handler.OnShareDownloaded), + ) + if err != nil { + return err + } + + return nil +} + +// ProviderSet is the Wire provider set for the watermark layer. +var ProviderSet = wire.NewSet(NewCQRSBusWithHandlers, NewCQRSHandler, NewEventBusPublisher) + +// NewCQRSBusWithHandlers creates the CQRSBus and registers all event handlers. +func NewCQRSBusWithHandlers(db *sql.DB, handler *CQRSHandler, logger log.Logger) (*CQRSBus, error) { + bus, err := NewCQRSBus(db, logger) + if err != nil { + return nil, err + } + if err := bus.RegisterHandlers(handler); err != nil { + return nil, err + } + return bus, nil +} diff --git a/internal/watermark/handlers.go b/internal/watermark/handlers.go index 92363fe..99b2230 100644 --- a/internal/watermark/handlers.go +++ b/internal/watermark/handlers.go @@ -1,13 +1,54 @@ package watermark import ( + "context" + "github.com/go-kratos/kratos/v2/log" ) +// CQRSHandler holds all Watermill event handlers. type CQRSHandler struct { log *log.Helper } +// NewCQRSHandler creates a new CQRSHandler. func NewCQRSHandler(logger log.Logger) *CQRSHandler { return &CQRSHandler{log: log.NewHelper(logger)} } + +// OnFileUploaded handles the FileUploadedEvent. +func (h *CQRSHandler) OnFileUploaded(ctx context.Context, event *FileUploadedEvent) error { + h.log.Infof("event: file uploaded, bucket=%s key=%s size=%d", event.BucketName, event.ObjectKey, event.Size) + return nil +} + +// OnFileDeleted handles the FileDeletedEvent. +func (h *CQRSHandler) OnFileDeleted(ctx context.Context, event *FileDeletedEvent) error { + h.log.Infof("event: file deleted, bucket=%s key=%s", event.BucketName, event.ObjectKey) + return nil +} + +// OnFolderCreated handles the FolderCreatedEvent. +func (h *CQRSHandler) OnFolderCreated(ctx context.Context, event *FolderCreatedEvent) error { + h.log.Infof("event: folder created, id=%s name=%s owner=%s", event.FolderID, event.Name, event.OwnerID) + return nil +} + +// OnFolderDeleted handles the FolderDeletedEvent. +func (h *CQRSHandler) OnFolderDeleted(ctx context.Context, event *FolderDeletedEvent) error { + h.log.Infof("event: folder deleted, id=%s owner=%s", event.FolderID, event.OwnerID) + return nil +} + +// OnShareCreated handles the ShareCreatedEvent. +func (h *CQRSHandler) OnShareCreated(ctx context.Context, event *ShareCreatedEvent) error { + h.log.Infof("event: share created, id=%s token=%s resource=%s/%s by=%s", + event.ShareID, event.Token, event.ResourceType, event.ResourceID, event.CreatedBy) + return nil +} + +// OnShareDownloaded handles the ShareDownloadedEvent. +func (h *CQRSHandler) OnShareDownloaded(ctx context.Context, event *ShareDownloadedEvent) error { + h.log.Infof("event: share downloaded, token=%s", event.Token) + return nil +} diff --git a/internal/watermark/publisher.go b/internal/watermark/publisher.go new file mode 100644 index 0000000..2ed007d --- /dev/null +++ b/internal/watermark/publisher.go @@ -0,0 +1,20 @@ +package watermark + +import ( + "context" +) + +// EventBusPublisher implements biz.EventPublisher using Watermill EventBus. +type EventBusPublisher struct { + bus *CQRSBus +} + +// NewEventBusPublisher creates a new EventBusPublisher from a CQRSBus. +func NewEventBusPublisher(bus *CQRSBus) *EventBusPublisher { + return &EventBusPublisher{bus: bus} +} + +// Publish publishes a domain event via the Watermill EventBus. +func (p *EventBusPublisher) Publish(ctx context.Context, event interface{}) error { + return p.bus.EventBus.Publish(ctx, event) +}