diff --git a/cmd/server/main.go b/cmd/server/main.go index d00d475..98f2ca0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,277 +1,54 @@ package main import ( - "context" - "io" - "log" - "net/http" + "flag" "os" - "os/signal" - "syscall" - "time" - _ "rag/file-system/docs" - "rag/file-system/internal/api/endpoints" - "rag/file-system/internal/api/handlers" - "rag/file-system/internal/api/validators" - "rag/file-system/internal/common" - "rag/file-system/internal/domain/model" - "rag/file-system/internal/domain/repository" - "rag/file-system/internal/infrastructure/database" - grpcAuth "rag/file-system/internal/infrastructure/grpc" - "rag/file-system/internal/infrastructure/mediator" - dbrepo "rag/file-system/internal/infrastructure/repository" - s3infra "rag/file-system/internal/infrastructure/s3" - "rag/file-system/internal/middleware" + "rag/file-system/internal/conf" - "github.com/gin-contrib/cors" - "github.com/gin-gonic/gin" - swaggerFiles "github.com/swaggo/files" - ginSwagger "github.com/swaggo/gin-swagger" - "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" + "github.com/go-kratos/kratos/v2/config" + fileconfig "github.com/go-kratos/kratos/v2/config/file" + "github.com/go-kratos/kratos/v2/log" ) -// @title RustFS File System API -// @version 1.3 -// @description RustFS file storage API with multipart upload, file preview, and paginated queries. -// @host localhost:8080 -// @BasePath / -// @securityDefinitions.apikey ApiKeyAuth -// @in header -// @name X-API-Key -func main() { - common.InitLogger() +var ( + flagconf string +) - cfg := common.LoadConfig() - if err := cfg.Validate(); err != nil { - common.Logger.Error("configuration error", "error", err) - os.Exit(1) - } - - // OpenTelemetry - otelShutdown := common.InitOTel(context.Background(), cfg.OTelEndpoint) - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := otelShutdown(ctx); err != nil { - common.Logger.Error("OTel shutdown error", "error", err) - } - }() - - // Infrastructure — S3 - rustfsClient := s3infra.NewRustFSClient(cfg) - s3Repo := s3infra.NewS3FileRepository(rustfsClient) - m := mediator.NewMediator() - - // Infrastructure — PostgreSQL - pgDB, err := database.NewPostgresDB(cfg.DatabaseURL) - if err != nil { - common.Logger.Error("failed to connect to database", "error", err) - os.Exit(1) - } - defer pgDB.Close() - - if err := database.RunMigrations(pgDB); err != nil { - common.Logger.Error("failed to run migrations", "error", err) - os.Exit(1) - } - - folderRepo := dbrepo.NewFolderRepository(pgDB) - fileMetaRepo := dbrepo.NewFileMetaRepository(pgDB) - shareRepo := dbrepo.NewShareRepository(pgDB) - - // gRPC Auth Client (if configured) - var authClient *grpcAuth.AuthClient - if cfg.GRPCAuthAddr != "" { - var err error - authClient, err = grpcAuth.NewAuthClient(cfg.GRPCAuthAddr) - if err != nil { - common.Logger.Error("failed to connect to auth gRPC server", "error", err) - os.Exit(1) - } - defer authClient.Close() - common.Logger.Info("gRPC auth client connected", "addr", cfg.GRPCAuthAddr) - } - - // Handlers - uploadHandler := handlers.NewUploadFileHandler(s3Repo) - downloadHandler := handlers.NewDownloadFileHandler(s3Repo) - createBucketHandler := handlers.NewCreateBucketHandler(s3Repo) - listBucketsHandler := handlers.NewListBucketsHandler(s3Repo) - deleteBucketHandler := handlers.NewDeleteBucketHandler(s3Repo) - listFilesHandler := handlers.NewListFilesHandler(s3Repo) - previewHandler := handlers.NewGetFilePreviewHandler(s3Repo) - initMultipartHandler := handlers.NewInitMultipartHandler(s3Repo) - uploadPartHandler := handlers.NewUploadPartHandler(s3Repo) - completeMultipartHandler := handlers.NewCompleteMultipartHandler(s3Repo) - deleteFileHandler := handlers.NewDeleteFileHandler(s3Repo) - abortMultipartHandler := handlers.NewAbortMultipartHandler(s3Repo) - fileContentHandler := handlers.NewGetFileContentHandler(s3Repo) - loginHandler := handlers.NewLoginHandler(cfg.AuthAPIKey) - - // Register Handlers - mediator.Register[handlers.UploadFileCommand, string](m, uploadHandler) - mediator.Register[handlers.DownloadFileQuery, io.ReadCloser](m, downloadHandler) - mediator.Register[handlers.CreateBucketCommand, string](m, createBucketHandler) - mediator.Register[handlers.ListBucketsQuery, []string](m, listBucketsHandler) - mediator.Register[handlers.DeleteBucketCommand, string](m, deleteBucketHandler) - mediator.Register[handlers.ListFilesQuery, *repository.ListFilesResult](m, listFilesHandler) - mediator.Register[handlers.GetFilePreviewQuery, string](m, previewHandler) - mediator.Register[handlers.InitMultipartCommand, string](m, initMultipartHandler) - mediator.Register[handlers.UploadPartCommand, string](m, uploadPartHandler) - mediator.Register[handlers.CompleteMultipartCommand, string](m, completeMultipartHandler) - mediator.Register[handlers.DeleteFileCommand, string](m, deleteFileHandler) - mediator.Register[handlers.AbortMultipartCommand, string](m, abortMultipartHandler) - mediator.Register[handlers.GetFileContentQuery, string](m, fileContentHandler) - mediator.Register[handlers.LoginQuery, handlers.LoginResult](m, loginHandler) - - // --- New folder/file/share handlers --- - createFolderHandler := handlers.NewCreateFolderHandler(folderRepo) - renameFolderHandler := handlers.NewRenameFolderHandler(folderRepo) - deleteFolderHandler := handlers.NewDeleteFolderHandler(folderRepo, s3Repo) - getFolderHandler := handlers.NewGetFolderHandler(folderRepo) - getFolderTreeHandler := handlers.NewGetFolderTreeHandler(folderRepo) - uploadToFolderHandler := handlers.NewUploadToFolderHandler(fileMetaRepo, folderRepo, s3Repo) - moveFileHandler := handlers.NewMoveFileHandler(fileMetaRepo) - createShareHandler := handlers.NewCreateShareHandler(shareRepo) - deleteShareHandler := handlers.NewDeleteShareHandler(shareRepo) - getShareInfoHandler := handlers.NewGetShareInfoHandler(shareRepo, fileMetaRepo) - downloadShareHandler := handlers.NewDownloadShareHandler(shareRepo, fileMetaRepo, s3Repo) - - mediator.Register[handlers.CreateFolderCommand, *model.Folder](m, createFolderHandler) - mediator.Register[handlers.RenameFolderCommand, *model.Folder](m, renameFolderHandler) - mediator.Register[handlers.DeleteFolderCommand, string](m, deleteFolderHandler) - mediator.Register[handlers.GetFolderQuery, *model.FolderWithChildren](m, getFolderHandler) - mediator.Register[handlers.GetFolderTreeQuery, []model.Folder](m, getFolderTreeHandler) - mediator.Register[handlers.UploadToFolderCommand, *model.FileMeta](m, uploadToFolderHandler) - mediator.Register[handlers.MoveFileCommand, string](m, moveFileHandler) - mediator.Register[handlers.CreateShareCommand, *model.ShareLink](m, createShareHandler) - mediator.Register[handlers.DeleteShareCommand, string](m, deleteShareHandler) - mediator.Register[handlers.GetShareInfoQuery, *model.ShareInfo](m, getShareInfoHandler) - mediator.Register[handlers.DownloadShareQuery, string](m, downloadShareHandler) - - // Validators - fileValidator := validators.NewFileValidator() - createBucketValidator := validators.NewCreateBucketValidator() - folderValidator := validators.NewFolderValidator() - shareValidator := validators.NewShareValidator() - - // Endpoints - fileEndpoint := endpoints.NewFileEndpoint(m, fileValidator) - bucketEndpoint := endpoints.NewBucketEndpoint(m, createBucketValidator) - authEndpoint := endpoints.NewAuthEndpoint(m) - folderEndpoint := endpoints.NewFolderEndpoint(m, folderValidator) - shareEndpoint := endpoints.NewShareEndpoint(m, shareValidator) - - // Router - r := gin.Default() - r.MaxMultipartMemory = 32 << 20 // 32MB - - // OpenTelemetry Gin middleware - r.Use(otelgin.Middleware("file-system")) - - // Middleware - r.Use(middleware.RequestIDMiddleware()) - r.Use(middleware.LoggingMiddleware()) - r.Use(middleware.TimeoutMiddleware(time.Duration(cfg.RequestTimeout) * time.Second)) - - // CORS - corsConfig := cors.Config{ - AllowOrigins: []string{"*"}, - AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, - AllowHeaders: []string{"Content-Type", "Content-Length", "X-API-Key", "X-Request-ID", "Authorization"}, - AllowCredentials: false, - } - r.Use(cors.New(corsConfig)) - - // Health check - r.GET("/health", func(c *gin.Context) { - c.JSON(http.StatusOK, gin.H{ - "status": "ok", - "time": time.Now().UTC(), - }) - }) - - // Public endpoints (no auth, rate limited) - r.POST("/auth/login", middleware.RateLimitMiddleware(1, 5), authEndpoint.Login) - - // Public share access (no auth) - r.GET("/share/:token", shareEndpoint.GetShareInfo) - r.POST("/share/:token/download", shareEndpoint.DownloadShare) - - // API group with auth middleware - api := r.Group("/") - if authClient != nil { - api.Use(middleware.JWTAuthMiddleware(authClient)) - } else { - api.Use(middleware.AuthMiddleware(cfg.AuthAPIKey)) - } - { - // File operations - api.POST("/files/upload", fileEndpoint.UploadFile) - api.GET("/files/download", fileEndpoint.DownloadFile) - api.GET("/files/list", fileEndpoint.ListFiles) - api.GET("/files/preview", fileEndpoint.GetPreviewURL) - api.GET("/files/content", fileEndpoint.GetFileContent) - api.DELETE("/files/delete", fileEndpoint.DeleteFile) - - // Multipart Upload - api.POST("/files/multipart/init", fileEndpoint.InitMultipart) - api.PUT("/files/multipart/part", fileEndpoint.UploadPart) - api.POST("/files/multipart/complete", fileEndpoint.CompleteMultipart) - api.POST("/files/multipart/abort", fileEndpoint.AbortMultipart) - - // Bucket operations - api.POST("/buckets", bucketEndpoint.CreateBucket) - api.GET("/buckets", bucketEndpoint.ListBuckets) - api.DELETE("/buckets", bucketEndpoint.DeleteBucket) - - // Folder operations - api.POST("/folders", folderEndpoint.CreateFolder) - api.GET("/folders/tree", folderEndpoint.GetFolderTree) - api.GET("/folders/:id", folderEndpoint.GetFolder) - api.PUT("/folders/:id", folderEndpoint.RenameFolder) - api.DELETE("/folders/:id", folderEndpoint.DeleteFolder) - api.POST("/folders/:folderId/files", folderEndpoint.UploadToFolder) - api.POST("/files/:id/move", folderEndpoint.MoveFile) - - // Share management (auth required) - api.POST("/share", shareEndpoint.CreateShare) - api.DELETE("/share/:id", shareEndpoint.DeleteShare) - } - - // Swagger - r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) - - // Web UI - r.Static("/web", "./web") - r.GET("/", func(c *gin.Context) { - c.Redirect(http.StatusMovedPermanently, "/web") - }) - - // Graceful shutdown - srv := &http.Server{ - Addr: ":" + cfg.ServerPort, - Handler: r, - } - - go func() { - common.Logger.Info("server starting", "port", cfg.ServerPort) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - common.Logger.Error("server failed", "error", err) - os.Exit(1) - } - }() - - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit - common.Logger.Info("shutting down server...") - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := srv.Shutdown(ctx); err != nil { - log.Printf("server forced shutdown: %v", err) - } - common.Logger.Info("server exited") +func init() { + flag.StringVar(&flagconf, "conf", "configs/config.yaml", "config path, eg: -conf config.yaml") +} + +func main() { + flag.Parse() + + logger := log.With(log.NewStdLogger(os.Stdout), + "ts", log.DefaultTimestamp, + "caller", log.DefaultCaller, + "service.kind", "file-system", + ) + + c := config.New( + config.WithSource( + fileconfig.NewSource(flagconf), + ), + ) + if err := c.Load(); err != nil { + panic(err) + } + + var bc conf.Bootstrap + if err := c.Scan(&bc); err != nil { + panic(err) + } + + app, cleanup, err := initApp(&bc, logger) + if err != nil { + panic(err) + } + defer cleanup() + + if err := app.Run(); err != nil { + panic(err) + } } diff --git a/cmd/server/wire.go b/cmd/server/wire.go new file mode 100644 index 0000000..caeda2f --- /dev/null +++ b/cmd/server/wire.go @@ -0,0 +1,61 @@ +//go:build wireinject + +package main + +import ( + "rag/file-system/internal/biz" + "rag/file-system/internal/conf" + "rag/file-system/internal/data" + "rag/file-system/internal/server" + "rag/file-system/internal/service" + + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/grpc" + "github.com/go-kratos/kratos/v2/transport/http" + "github.com/google/wire" +) + +// newApp creates a new Kratos application with HTTP and gRPC servers. +func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App { + return kratos.New( + kratos.Name("file-system"), + kratos.Logger(logger), + kratos.Server(hs, gs), + ) +} + +// newConfServer extracts the Server config from Bootstrap. +func newConfServer(bc *conf.Bootstrap) *conf.Server { + return bc.GetServer() +} + +// newConfData extracts the Data config from Bootstrap. +func newConfData(bc *conf.Bootstrap) *conf.Data { + return bc.GetData() +} + +// initApp wires up the entire dependency graph. +func initApp(*conf.Bootstrap, log.Logger) (*kratos.App, func(), error) { + panic(wire.Build( + // Config extraction + newConfServer, + newConfData, + + // Provider sets from each layer + data.ProviderSet, + biz.ProviderSet, + service.ProviderSet, + server.ProviderSet, + + // Interface bindings: biz interfaces → data implementations + wire.Bind(new(biz.FileRepo), new(*data.FileRepo)), + wire.Bind(new(biz.FolderRepo), new(*data.FolderRepo)), + wire.Bind(new(biz.FileMetaRepo), new(*data.FileMetaRepo)), + wire.Bind(new(biz.FileMetaRepoForShare), new(*data.FileMetaRepo)), + wire.Bind(new(biz.ShareRepo), new(*data.ShareRepo)), + + // App constructor + newApp, + )) +} diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go new file mode 100644 index 0000000..a1c96c4 --- /dev/null +++ b/cmd/server/wire_gen.go @@ -0,0 +1,63 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package main + +import ( + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/grpc" + "github.com/go-kratos/kratos/v2/transport/http" + "rag/file-system/internal/biz" + "rag/file-system/internal/conf" + "rag/file-system/internal/data" + "rag/file-system/internal/server" + "rag/file-system/internal/service" +) + +// Injectors from wire.go: + +// initApp wires up the entire dependency graph. +func initApp(bootstrap *conf.Bootstrap, logger log.Logger) (*kratos.App, func(), error) { + confServer := newConfServer(bootstrap) + confData := newConfData(bootstrap) + fileRepo := data.NewFileRepo(confData) + fileUsecase := biz.NewFileUsecase(fileRepo, logger) + bucketUsecase := biz.NewBucketUsecase(fileRepo, logger) + dataData, cleanup, err := data.NewData(confData, logger) + if err != nil { + return nil, nil, err + } + folderRepo := data.NewFolderRepo(dataData, logger) + fileMetaRepo := data.NewFileMetaRepo(dataData, logger) + folderUsecase := biz.NewFolderUsecase(folderRepo, fileMetaRepo, fileRepo, logger) + shareRepo := data.NewShareRepo(dataData, logger) + shareUsecase := biz.NewShareUsecase(shareRepo, fileMetaRepo, fileRepo, logger) + fileService := service.NewFileService(fileUsecase, bucketUsecase, folderUsecase, shareUsecase, logger) + httpServer := server.NewHTTPServer(confServer, fileService, logger) + grpcServer := server.NewGRPCServer(confServer, fileService, logger) + app := newApp(logger, httpServer, grpcServer) + return app, func() { + cleanup() + }, nil +} + +// wire.go: + +// newApp creates a new Kratos application with HTTP and gRPC servers. +func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App { + return kratos.New(kratos.Name("file-system"), kratos.Logger(logger), kratos.Server(hs, gs)) +} + +// newConfServer extracts the Server config from Bootstrap. +func newConfServer(bc *conf.Bootstrap) *conf.Server { + return bc.GetServer() +} + +// newConfData extracts the Data config from Bootstrap. +func newConfData(bc *conf.Bootstrap) *conf.Data { + return bc.GetData() +} diff --git a/internal/deps.go b/internal/deps.go deleted file mode 100644 index 5901823..0000000 --- a/internal/deps.go +++ /dev/null @@ -1,31 +0,0 @@ -// Package internal contains dependency blank imports for the Kratos migration. -// These ensure go mod tidy keeps the required packages until the migration -// is complete and actual code imports them directly. -// This file will be removed after the migration is finished. -package internal - -import ( - // Kratos framework - core, transports, config, logging - _ "github.com/go-kratos/kratos/v2" - _ "github.com/go-kratos/kratos/v2/config" - _ "github.com/go-kratos/kratos/v2/config/file" - _ "github.com/go-kratos/kratos/v2/log" - _ "github.com/go-kratos/kratos/v2/middleware/auth/jwt" - _ "github.com/go-kratos/kratos/v2/middleware/logging" - _ "github.com/go-kratos/kratos/v2/middleware/recovery" - _ "github.com/go-kratos/kratos/v2/middleware/tracing" - _ "github.com/go-kratos/kratos/v2/middleware/validate" - _ "github.com/go-kratos/kratos/v2/transport/grpc" - _ "github.com/go-kratos/kratos/v2/transport/http" - - // Wire DI - _ "github.com/google/wire" - - // Watermill - event-driven CQRS - _ "github.com/ThreeDotsLabs/watermill" - _ "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" - - // GORM - ORM for PostgreSQL - _ "gorm.io/driver/postgres" - _ "gorm.io/gorm" -)