向宁 3a18ca0579 feat: add directory structure and file sharing support
- PostgreSQL metadata overlay layer on top of existing S3 storage
- 3 new tables: folders, files, share_links
- Folder CRUD: create, get with children, tree, rename, delete (cascade)
- File operations: upload to folder, move between folders
- Share links: create with optional password/expiry/download limit, public access
- S3 compensation on PG write failure
- Existing 14 endpoints untouched
2026-05-20 20:26:19 +08:00

278 lines
10 KiB
Go

package main
import (
"context"
"io"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
_ "rag/file-system/docs"
"rag/file-system/internal/api/endpoints"
"rag/file-system/internal/api/handlers"
"rag/file-system/internal/api/validators"
"rag/file-system/internal/common"
"rag/file-system/internal/domain/model"
"rag/file-system/internal/domain/repository"
"rag/file-system/internal/infrastructure/database"
grpcAuth "rag/file-system/internal/infrastructure/grpc"
"rag/file-system/internal/infrastructure/mediator"
dbrepo "rag/file-system/internal/infrastructure/repository"
s3infra "rag/file-system/internal/infrastructure/s3"
"rag/file-system/internal/middleware"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
)
// @title RustFS File System API
// @version 1.3
// @description RustFS file storage API with multipart upload, file preview, and paginated queries.
// @host localhost:8080
// @BasePath /
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-API-Key
func main() {
common.InitLogger()
cfg := common.LoadConfig()
if err := cfg.Validate(); err != nil {
common.Logger.Error("configuration error", "error", err)
os.Exit(1)
}
// OpenTelemetry
otelShutdown := common.InitOTel(context.Background(), cfg.OTelEndpoint)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := otelShutdown(ctx); err != nil {
common.Logger.Error("OTel shutdown error", "error", err)
}
}()
// Infrastructure — S3
rustfsClient := s3infra.NewRustFSClient(cfg)
s3Repo := s3infra.NewS3FileRepository(rustfsClient)
m := mediator.NewMediator()
// Infrastructure — PostgreSQL
pgDB, err := database.NewPostgresDB(cfg.DatabaseURL)
if err != nil {
common.Logger.Error("failed to connect to database", "error", err)
os.Exit(1)
}
defer pgDB.Close()
if err := database.RunMigrations(pgDB); err != nil {
common.Logger.Error("failed to run migrations", "error", err)
os.Exit(1)
}
folderRepo := dbrepo.NewFolderRepository(pgDB)
fileMetaRepo := dbrepo.NewFileMetaRepository(pgDB)
shareRepo := dbrepo.NewShareRepository(pgDB)
// gRPC Auth Client (if configured)
var authClient *grpcAuth.AuthClient
if cfg.GRPCAuthAddr != "" {
var err error
authClient, err = grpcAuth.NewAuthClient(cfg.GRPCAuthAddr)
if err != nil {
common.Logger.Error("failed to connect to auth gRPC server", "error", err)
os.Exit(1)
}
defer authClient.Close()
common.Logger.Info("gRPC auth client connected", "addr", cfg.GRPCAuthAddr)
}
// Handlers
uploadHandler := handlers.NewUploadFileHandler(s3Repo)
downloadHandler := handlers.NewDownloadFileHandler(s3Repo)
createBucketHandler := handlers.NewCreateBucketHandler(s3Repo)
listBucketsHandler := handlers.NewListBucketsHandler(s3Repo)
deleteBucketHandler := handlers.NewDeleteBucketHandler(s3Repo)
listFilesHandler := handlers.NewListFilesHandler(s3Repo)
previewHandler := handlers.NewGetFilePreviewHandler(s3Repo)
initMultipartHandler := handlers.NewInitMultipartHandler(s3Repo)
uploadPartHandler := handlers.NewUploadPartHandler(s3Repo)
completeMultipartHandler := handlers.NewCompleteMultipartHandler(s3Repo)
deleteFileHandler := handlers.NewDeleteFileHandler(s3Repo)
abortMultipartHandler := handlers.NewAbortMultipartHandler(s3Repo)
fileContentHandler := handlers.NewGetFileContentHandler(s3Repo)
loginHandler := handlers.NewLoginHandler(cfg.AuthAPIKey)
// Register Handlers
mediator.Register[handlers.UploadFileCommand, string](m, uploadHandler)
mediator.Register[handlers.DownloadFileQuery, io.ReadCloser](m, downloadHandler)
mediator.Register[handlers.CreateBucketCommand, string](m, createBucketHandler)
mediator.Register[handlers.ListBucketsQuery, []string](m, listBucketsHandler)
mediator.Register[handlers.DeleteBucketCommand, string](m, deleteBucketHandler)
mediator.Register[handlers.ListFilesQuery, *repository.ListFilesResult](m, listFilesHandler)
mediator.Register[handlers.GetFilePreviewQuery, string](m, previewHandler)
mediator.Register[handlers.InitMultipartCommand, string](m, initMultipartHandler)
mediator.Register[handlers.UploadPartCommand, string](m, uploadPartHandler)
mediator.Register[handlers.CompleteMultipartCommand, string](m, completeMultipartHandler)
mediator.Register[handlers.DeleteFileCommand, string](m, deleteFileHandler)
mediator.Register[handlers.AbortMultipartCommand, string](m, abortMultipartHandler)
mediator.Register[handlers.GetFileContentQuery, string](m, fileContentHandler)
mediator.Register[handlers.LoginQuery, handlers.LoginResult](m, loginHandler)
// --- New folder/file/share handlers ---
createFolderHandler := handlers.NewCreateFolderHandler(folderRepo)
renameFolderHandler := handlers.NewRenameFolderHandler(folderRepo)
deleteFolderHandler := handlers.NewDeleteFolderHandler(folderRepo, s3Repo)
getFolderHandler := handlers.NewGetFolderHandler(folderRepo)
getFolderTreeHandler := handlers.NewGetFolderTreeHandler(folderRepo)
uploadToFolderHandler := handlers.NewUploadToFolderHandler(fileMetaRepo, folderRepo, s3Repo)
moveFileHandler := handlers.NewMoveFileHandler(fileMetaRepo)
createShareHandler := handlers.NewCreateShareHandler(shareRepo)
deleteShareHandler := handlers.NewDeleteShareHandler(shareRepo)
getShareInfoHandler := handlers.NewGetShareInfoHandler(shareRepo, fileMetaRepo)
downloadShareHandler := handlers.NewDownloadShareHandler(shareRepo, fileMetaRepo, s3Repo)
mediator.Register[handlers.CreateFolderCommand, *model.Folder](m, createFolderHandler)
mediator.Register[handlers.RenameFolderCommand, *model.Folder](m, renameFolderHandler)
mediator.Register[handlers.DeleteFolderCommand, string](m, deleteFolderHandler)
mediator.Register[handlers.GetFolderQuery, *model.FolderWithChildren](m, getFolderHandler)
mediator.Register[handlers.GetFolderTreeQuery, []model.Folder](m, getFolderTreeHandler)
mediator.Register[handlers.UploadToFolderCommand, *model.FileMeta](m, uploadToFolderHandler)
mediator.Register[handlers.MoveFileCommand, string](m, moveFileHandler)
mediator.Register[handlers.CreateShareCommand, *model.ShareLink](m, createShareHandler)
mediator.Register[handlers.DeleteShareCommand, string](m, deleteShareHandler)
mediator.Register[handlers.GetShareInfoQuery, *model.ShareInfo](m, getShareInfoHandler)
mediator.Register[handlers.DownloadShareQuery, string](m, downloadShareHandler)
// Validators
fileValidator := validators.NewFileValidator()
createBucketValidator := validators.NewCreateBucketValidator()
folderValidator := validators.NewFolderValidator()
shareValidator := validators.NewShareValidator()
// Endpoints
fileEndpoint := endpoints.NewFileEndpoint(m, fileValidator)
bucketEndpoint := endpoints.NewBucketEndpoint(m, createBucketValidator)
authEndpoint := endpoints.NewAuthEndpoint(m)
folderEndpoint := endpoints.NewFolderEndpoint(m, folderValidator)
shareEndpoint := endpoints.NewShareEndpoint(m, shareValidator)
// Router
r := gin.Default()
r.MaxMultipartMemory = 32 << 20 // 32MB
// OpenTelemetry Gin middleware
r.Use(otelgin.Middleware("file-system"))
// Middleware
r.Use(middleware.RequestIDMiddleware())
r.Use(middleware.LoggingMiddleware())
r.Use(middleware.TimeoutMiddleware(time.Duration(cfg.RequestTimeout) * time.Second))
// CORS
corsConfig := cors.Config{
AllowOrigins: []string{"*"},
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Content-Type", "Content-Length", "X-API-Key", "X-Request-ID", "Authorization"},
AllowCredentials: false,
}
r.Use(cors.New(corsConfig))
// Health check
r.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"time": time.Now().UTC(),
})
})
// Public endpoints (no auth, rate limited)
r.POST("/auth/login", middleware.RateLimitMiddleware(1, 5), authEndpoint.Login)
// Public share access (no auth)
r.GET("/share/:token", shareEndpoint.GetShareInfo)
r.POST("/share/:token/download", shareEndpoint.DownloadShare)
// API group with auth middleware
api := r.Group("/")
if authClient != nil {
api.Use(middleware.JWTAuthMiddleware(authClient))
} else {
api.Use(middleware.AuthMiddleware(cfg.AuthAPIKey))
}
{
// File operations
api.POST("/files/upload", fileEndpoint.UploadFile)
api.GET("/files/download", fileEndpoint.DownloadFile)
api.GET("/files/list", fileEndpoint.ListFiles)
api.GET("/files/preview", fileEndpoint.GetPreviewURL)
api.GET("/files/content", fileEndpoint.GetFileContent)
api.DELETE("/files/delete", fileEndpoint.DeleteFile)
// Multipart Upload
api.POST("/files/multipart/init", fileEndpoint.InitMultipart)
api.PUT("/files/multipart/part", fileEndpoint.UploadPart)
api.POST("/files/multipart/complete", fileEndpoint.CompleteMultipart)
api.POST("/files/multipart/abort", fileEndpoint.AbortMultipart)
// Bucket operations
api.POST("/buckets", bucketEndpoint.CreateBucket)
api.GET("/buckets", bucketEndpoint.ListBuckets)
api.DELETE("/buckets", bucketEndpoint.DeleteBucket)
// Folder operations
api.POST("/folders", folderEndpoint.CreateFolder)
api.GET("/folders/tree", folderEndpoint.GetFolderTree)
api.GET("/folders/:id", folderEndpoint.GetFolder)
api.PUT("/folders/:id", folderEndpoint.RenameFolder)
api.DELETE("/folders/:id", folderEndpoint.DeleteFolder)
api.POST("/folders/:folderId/files", folderEndpoint.UploadToFolder)
api.POST("/files/:id/move", folderEndpoint.MoveFile)
// Share management (auth required)
api.POST("/share", shareEndpoint.CreateShare)
api.DELETE("/share/:id", shareEndpoint.DeleteShare)
}
// Swagger
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
// Web UI
r.Static("/web", "./web")
r.GET("/", func(c *gin.Context) {
c.Redirect(http.StatusMovedPermanently, "/web")
})
// Graceful shutdown
srv := &http.Server{
Addr: ":" + cfg.ServerPort,
Handler: r,
}
go func() {
common.Logger.Info("server starting", "port", cfg.ServerPort)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
common.Logger.Error("server failed", "error", err)
os.Exit(1)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
common.Logger.Info("shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Printf("server forced shutdown: %v", err)
}
common.Logger.Info("server exited")
}