diff --git a/internal/data/data.go b/internal/data/data.go new file mode 100644 index 0000000..8bb18f1 --- /dev/null +++ b/internal/data/data.go @@ -0,0 +1,120 @@ +package data + +import ( + "context" + "time" + + "rag/file-system/internal/conf" + + "github.com/go-kratos/kratos/v2/log" + "github.com/google/wire" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +// Data holds the GORM database connection and provides transaction support. +type Data struct { + db *gorm.DB + log *log.Helper +} + +// contextTxKey is the context key for storing the current transaction. +type contextTxKey struct{} + +// NewData creates a new Data instance with GORM connected to PostgreSQL. +func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) { + helper := log.NewHelper(logger) + db, err := gorm.Open(postgres.Open(c.Database.Source), &gorm.Config{}) + if err != nil { + return nil, nil, err + } + + sqlDB, err := db.DB() + if err != nil { + return nil, nil, err + } + sqlDB.SetMaxOpenConns(25) + sqlDB.SetMaxIdleConns(5) + + if err := db.AutoMigrate(&FolderPO{}, &FileMetaPO{}, &ShareLinkPO{}); err != nil { + return nil, nil, err + } + + helper.Info("connected to PostgreSQL via GORM") + cleanup := func() { + sqlDB.Close() + } + return &Data{db: db, log: helper}, cleanup, nil +} + +// DB returns the *gorm.DB for the given context. If a transaction is active +// in the context, it returns the transaction DB; otherwise the global DB. +func (d *Data) DB(ctx context.Context) *gorm.DB { + tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB) + if ok { + return tx + } + return d.db.WithContext(ctx) +} + +// Transaction is the interface for executing operations within a database transaction. +type Transaction interface { + InTx(ctx context.Context, fn func(ctx context.Context) error) error +} + +// InTx executes fn inside a database transaction. +func (d *Data) InTx(ctx context.Context, fn func(ctx context.Context) error) error { + return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + ctx = context.WithValue(ctx, contextTxKey{}, tx) + return fn(ctx) + }) +} + +// ProviderSet is the Wire provider set for the data layer. +var ProviderSet = wire.NewSet(NewData, NewFileRepo, NewFolderRepo, NewFileMetaRepo, NewShareRepo) + +// --- GORM Models (Persistence Objects) --- + +// FolderPO maps to the "folders" table. +type FolderPO struct { + ID string `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"` + ParentID *string `gorm:"type:uuid;index:idx_folders_parent"` + Name string `gorm:"type:varchar(255);not null"` + OwnerID string `gorm:"type:varchar(36);not null;index:idx_folders_owner"` + CreatedAt time.Time `gorm:"autoCreateTime"` + UpdatedAt time.Time `gorm:"autoUpdateTime"` +} + +func (FolderPO) TableName() string { return "folders" } + +// FileMetaPO maps to the "files" table. +type FileMetaPO struct { + ID string `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"` + FolderID string `gorm:"type:uuid;index:idx_files_folder"` + Name string `gorm:"type:varchar(255);not null"` + S3Key string `gorm:"type:varchar(512);not null;index:idx_files_s3_key"` + S3Bucket string `gorm:"type:varchar(255);not null"` + Size int64 `gorm:"default:0"` + ContentType string `gorm:"type:varchar(255);default:'application/octet-stream'"` + OwnerID string `gorm:"type:varchar(36);not null;index:idx_files_owner"` + CreatedAt time.Time `gorm:"autoCreateTime"` + UpdatedAt time.Time `gorm:"autoUpdateTime"` +} + +func (FileMetaPO) TableName() string { return "files" } + +// ShareLinkPO maps to the "share_links" table. +type ShareLinkPO struct { + ID string `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"` + ResourceType string `gorm:"type:varchar(10);not null"` + ResourceID string `gorm:"type:uuid;not null"` + Token string `gorm:"type:varchar(32);not null;uniqueIndex:idx_share_token"` + Password *string `gorm:"type:varchar(255)"` + ExpiresAt *time.Time `gorm:"type:timestamptz"` + DownloadCount int `gorm:"default:0"` + MaxDownloads *int + CreatedBy string `gorm:"type:varchar(36);not null"` + CreatedAt time.Time `gorm:"autoCreateTime"` +} + +func (ShareLinkPO) TableName() string { return "share_links" } diff --git a/internal/data/file_meta_repo.go b/internal/data/file_meta_repo.go new file mode 100644 index 0000000..38acf93 --- /dev/null +++ b/internal/data/file_meta_repo.go @@ -0,0 +1,90 @@ +package data + +import ( + "context" + "fmt" + "time" + + "github.com/go-kratos/kratos/v2/log" +) + +// FileMetaRepo implements file metadata persistence operations using GORM. +type FileMetaRepo struct { + data *Data + log *log.Helper +} + +// NewFileMetaRepo creates a new FileMetaRepo. +func NewFileMetaRepo(data *Data, logger log.Logger) *FileMetaRepo { + return &FileMetaRepo{ + data: data, + log: log.NewHelper(logger), + } +} + +// Create inserts a new file metadata record. +func (r *FileMetaRepo) Create(ctx context.Context, file *FileMetaPO) error { + return r.data.DB(ctx).Create(file).Error +} + +// GetByID retrieves file metadata by ID. Returns nil if not found. +func (r *FileMetaRepo) GetByID(ctx context.Context, id string) (*FileMetaPO, error) { + var file FileMetaPO + err := r.data.DB(ctx).Where("id = ?", id).First(&file).Error + if err != nil { + if isRecordNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("failed to get file meta: %w", err) + } + return &file, nil +} + +// GetByFolder retrieves all files in a folder, ordered by name. +func (r *FileMetaRepo) GetByFolder(ctx context.Context, folderID string) ([]FileMetaPO, error) { + var files []FileMetaPO + err := r.data.DB(ctx). + Where("folder_id = ?", folderID). + Order("name"). + Find(&files).Error + if err != nil { + return nil, fmt.Errorf("failed to get files by folder: %w", err) + } + return files, nil +} + +// Move updates the folder_id of a file (moves it to another folder). +func (r *FileMetaRepo) Move(ctx context.Context, fileID string, targetFolderID string, ownerID string) error { + result := r.data.DB(ctx).Model(&FileMetaPO{}). + Where("id = ? AND owner_id = ?", fileID, ownerID). + Updates(map[string]interface{}{ + "folder_id": targetFolderID, + "updated_at": gormNow(), + }) + if result.Error != nil { + return fmt.Errorf("failed to move file: %w", result.Error) + } + if result.RowsAffected == 0 { + return fmt.Errorf("file not found or not owned by user") + } + return nil +} + +// Delete removes a file metadata record by ID and ownerID. +func (r *FileMetaRepo) Delete(ctx context.Context, id string, ownerID string) error { + result := r.data.DB(ctx). + Where("id = ? AND owner_id = ?", id, ownerID). + Delete(&FileMetaPO{}) + if result.Error != nil { + return fmt.Errorf("failed to delete file meta: %w", result.Error) + } + if result.RowsAffected == 0 { + return fmt.Errorf("file not found or not owned by user") + } + return nil +} + +// gormNow returns the current UTC time for use in GORM updates. +func gormNow() time.Time { + return time.Now().UTC() +} diff --git a/internal/data/file_repo.go b/internal/data/file_repo.go new file mode 100644 index 0000000..41f12e1 --- /dev/null +++ b/internal/data/file_repo.go @@ -0,0 +1,285 @@ +package data + +import ( + "context" + "fmt" + "io" + "sort" + "time" + + "rag/file-system/internal/conf" + "rag/file-system/internal/pkg/s3errors" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +const maxContentPreviewSize = 10 * 1024 * 1024 // 10MB + +// Part represents a single uploaded part in a multipart upload. +type Part struct { + ETag string + PartNumber int32 +} + +// FileInfo holds metadata about an S3 object. +type FileInfo struct { + Key string + Size int64 + LastModified time.Time + ETag string +} + +// ListFilesResult holds the result of a paginated list operation. +type ListFilesResult struct { + Files []FileInfo + NextContinuationToken *string +} + +// FileRepo handles all S3 storage operations. +type FileRepo struct { + client *s3.Client + presignClient *s3.PresignClient +} + +// NewFileRepo creates a new FileRepo with an S3 client configured from the provided config. +func NewFileRepo(c *conf.Data) *FileRepo { + s3Conf := c.GetS3() + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: s3Conf.GetEndpoint(), + SigningRegion: s3Conf.GetRegion(), + }, nil + }) + + awsCfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(s3Conf.GetRegion()), + config.WithEndpointResolverWithOptions(customResolver), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + s3Conf.GetAccessKey(), + s3Conf.GetSecretKey(), + "", + )), + ) + if err != nil { + panic(fmt.Sprintf("unable to load S3 SDK config: %v", err)) + } + + client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + return &FileRepo{ + client: client, + presignClient: s3.NewPresignClient(client), + } +} + +// UploadFile uploads data to the specified bucket and object key. +func (r *FileRepo) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error { + _, err := r.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: data, + }) + return s3errors.Wrap(err) +} + +// DownloadFile downloads an object from S3 and returns its body as a ReadCloser. +func (r *FileRepo) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) { + resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + return nil, s3errors.Wrap(err) + } + return resp.Body, nil +} + +// ListBuckets returns all bucket names. +func (r *FileRepo) ListBuckets(ctx context.Context) ([]string, error) { + resp, err := r.client.ListBuckets(ctx, &s3.ListBucketsInput{}) + if err != nil { + return nil, s3errors.Wrap(err) + } + var buckets []string + for _, b := range resp.Buckets { + if b.Name != nil { + buckets = append(buckets, *b.Name) + } + } + return buckets, nil +} + +// CreateBucket creates a new S3 bucket. +func (r *FileRepo) CreateBucket(ctx context.Context, bucketName string) error { + _, err := r.client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + return s3errors.Wrap(err) +} + +// DeleteBucket deletes an S3 bucket. +func (r *FileRepo) DeleteBucket(ctx context.Context, bucketName string) error { + _, err := r.client.DeleteBucket(ctx, &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + return s3errors.Wrap(err) +} + +// GetFileContent retrieves text file content for preview (e.g., Markdown files). +func (r *FileRepo) GetFileContent(ctx context.Context, bucketName string, objectKey string) (string, error) { + resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + return "", s3errors.Wrap(err) + } + defer resp.Body.Close() + + data, err := io.ReadAll(io.LimitReader(resp.Body, maxContentPreviewSize)) + if err != nil { + return "", err + } + if int64(len(data)) >= maxContentPreviewSize { + return "", fmt.Errorf("file too large for content preview (max 10MB)") + } + return string(data), nil +} + +// DeleteFile removes a file from the bucket. +func (r *FileRepo) DeleteFile(ctx context.Context, bucketName string, objectKey string) error { + _, err := r.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + return s3errors.Wrap(err) +} + +// ListObjectsV2 lists files with pagination support. +func (r *FileRepo) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*ListFilesResult, error) { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucketName), + Prefix: aws.String(prefix), + MaxKeys: aws.Int32(maxKeys), + } + if continuationToken != nil && *continuationToken != "" { + input.ContinuationToken = continuationToken + } + + resp, err := r.client.ListObjectsV2(ctx, input) + if err != nil { + return nil, s3errors.Wrap(err) + } + + files := make([]FileInfo, 0, len(resp.Contents)) + for _, obj := range resp.Contents { + if obj.Key == nil || obj.Size == nil || obj.LastModified == nil || obj.ETag == nil { + continue + } + files = append(files, FileInfo{ + Key: *obj.Key, + Size: *obj.Size, + LastModified: *obj.LastModified, + ETag: *obj.ETag, + }) + } + + return &ListFilesResult{ + Files: files, + NextContinuationToken: resp.NextContinuationToken, + }, nil +} + +// GeneratePresignedURL generates a presigned URL for temporary file access. +func (r *FileRepo) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) { + presignResult, err := r.presignClient.PresignGetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }, func(opts *s3.PresignOptions) { + opts.Expires = expiry + }) + if err != nil { + return "", s3errors.Wrap(err) + } + return presignResult.URL, nil +} + +// CreateMultipartUpload initializes a multipart upload session. +func (r *FileRepo) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) { + resp, err := r.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + return "", s3errors.Wrap(err) + } + if resp.UploadId == nil { + return "", fmt.Errorf("failed to initialize multipart upload") + } + return *resp.UploadId, nil +} + +// UploadPart uploads a single part of a multipart upload. +func (r *FileRepo) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) { + resp, err := r.client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadId), + PartNumber: aws.Int32(partNumber), + Body: data, + }) + if err != nil { + return "", s3errors.Wrap(err) + } + if resp.ETag == nil { + return "", fmt.Errorf("failed to upload part") + } + return *resp.ETag, nil +} + +// CompleteMultipartUpload assembles all parts to complete the upload. +func (r *FileRepo) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []Part) (string, error) { + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + + completedParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + completedParts[i] = types.CompletedPart{ + ETag: aws.String(p.ETag), + PartNumber: aws.Int32(p.PartNumber), + } + } + + resp, err := r.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadId), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) + if err != nil { + return "", s3errors.Wrap(err) + } + if resp.Location == nil { + return "", fmt.Errorf("failed to complete multipart upload") + } + return *resp.Location, nil +} + +// AbortMultipartUpload cancels an in-progress multipart upload. +func (r *FileRepo) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error { + _, err := r.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadId), + }) + return s3errors.Wrap(err) +} diff --git a/internal/data/folder_repo.go b/internal/data/folder_repo.go new file mode 100644 index 0000000..db64a4e --- /dev/null +++ b/internal/data/folder_repo.go @@ -0,0 +1,148 @@ +package data + +import ( + "context" + "fmt" + + "github.com/go-kratos/kratos/v2/log" + "gorm.io/gorm" +) + +// FolderRepo implements folder persistence operations using GORM. +type FolderRepo struct { + data *Data + log *log.Helper +} + +// NewFolderRepo creates a new FolderRepo. +func NewFolderRepo(data *Data, logger log.Logger) *FolderRepo { + return &FolderRepo{ + data: data, + log: log.NewHelper(logger), + } +} + +// FolderWithChildren holds a folder along with its sub-folders and files. +type FolderWithChildren struct { + Folder FolderPO + SubFolders []FolderPO + Files []FileMetaPO +} + +// Create inserts a new folder record. +func (r *FolderRepo) Create(ctx context.Context, folder *FolderPO) error { + return r.data.DB(ctx).Create(folder).Error +} + +// GetByID retrieves a folder by its ID. Returns nil if not found. +func (r *FolderRepo) GetByID(ctx context.Context, id string) (*FolderPO, error) { + var folder FolderPO + err := r.data.DB(ctx).Where("id = ?", id).First(&folder).Error + if err != nil { + if isRecordNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("failed to get folder: %w", err) + } + return &folder, nil +} + +// GetWithChildren retrieves a folder with its sub-folders and files, filtered by ownerID. +func (r *FolderRepo) GetWithChildren(ctx context.Context, id string, ownerID string) (*FolderWithChildren, error) { + folder, err := r.GetByID(ctx, id) + if err != nil { + return nil, err + } + if folder == nil || folder.OwnerID != ownerID { + return nil, nil + } + + var subFolders []FolderPO + if err := r.data.DB(ctx). + Where("parent_id = ? AND owner_id = ?", id, ownerID). + Order("name"). + Find(&subFolders).Error; err != nil { + return nil, fmt.Errorf("failed to query sub-folders: %w", err) + } + + var files []FileMetaPO + if err := r.data.DB(ctx). + Where("folder_id = ? AND owner_id = ?", id, ownerID). + Order("name"). + Find(&files).Error; err != nil { + return nil, fmt.Errorf("failed to query files: %w", err) + } + + return &FolderWithChildren{ + Folder: *folder, + SubFolders: subFolders, + Files: files, + }, nil +} + +// GetTree retrieves all folders owned by the given ownerID. +func (r *FolderRepo) GetTree(ctx context.Context, ownerID string) ([]FolderPO, error) { + var folders []FolderPO + err := r.data.DB(ctx). + Where("owner_id = ?", ownerID). + Order("name"). + Find(&folders).Error + if err != nil { + return nil, fmt.Errorf("failed to get folder tree: %w", err) + } + return folders, nil +} + +// Update modifies a folder's name and updated_at timestamp. +func (r *FolderRepo) Update(ctx context.Context, folder *FolderPO) error { + result := r.data.DB(ctx).Model(&FolderPO{}). + Where("id = ?", folder.ID). + Updates(map[string]interface{}{ + "name": folder.Name, + "updated_at": folder.UpdatedAt, + }) + if result.Error != nil { + return fmt.Errorf("failed to update folder: %w", result.Error) + } + return nil +} + +// Delete removes a folder by ID and ownerID. Returns error if not found. +func (r *FolderRepo) Delete(ctx context.Context, id string, ownerID string) error { + result := r.data.DB(ctx). + Where("id = ? AND owner_id = ?", id, ownerID). + Delete(&FolderPO{}) + if result.Error != nil { + return fmt.Errorf("failed to delete folder: %w", result.Error) + } + if result.RowsAffected == 0 { + return fmt.Errorf("folder not found or not owned by user") + } + return nil +} + +// GetDescendantFileS3Keys returns all files in a folder and all its descendant folders. +// Uses a recursive CTE to walk the folder tree. +func (r *FolderRepo) GetDescendantFileS3Keys(ctx context.Context, id string, ownerID string) ([]FileMetaPO, error) { + var files []FileMetaPO + // Raw SQL with recursive CTE - GORM doesn't natively support recursive queries + err := r.data.DB(ctx).Raw(` + WITH RECURSIVE descendants AS ( + SELECT id FROM folders WHERE id = ? AND owner_id = ? + UNION + SELECT f.id FROM folders f INNER JOIN descendants d ON f.parent_id = d.id + ) + SELECT fi.id, fi.folder_id, fi.name, fi.s3_key, fi.s3_bucket, fi.size, fi.content_type, fi.owner_id, fi.created_at, fi.updated_at + FROM files fi INNER JOIN descendants d ON fi.folder_id = d.id + WHERE fi.owner_id = ?`, + id, ownerID, ownerID).Scan(&files).Error + if err != nil { + return nil, fmt.Errorf("failed to get descendant file S3 keys: %w", err) + } + return files, nil +} + +// isRecordNotFound checks if the error is a GORM record-not-found error. +func isRecordNotFound(err error) bool { + return err == gorm.ErrRecordNotFound +} diff --git a/internal/data/share_repo.go b/internal/data/share_repo.go new file mode 100644 index 0000000..6e10bce --- /dev/null +++ b/internal/data/share_repo.go @@ -0,0 +1,88 @@ +package data + +import ( + "context" + "fmt" + + "github.com/go-kratos/kratos/v2/log" + "gorm.io/gorm" +) + +// ShareRepo implements share link persistence operations using GORM. +type ShareRepo struct { + data *Data + log *log.Helper +} + +// NewShareRepo creates a new ShareRepo. +func NewShareRepo(data *Data, logger log.Logger) *ShareRepo { + return &ShareRepo{ + data: data, + log: log.NewHelper(logger), + } +} + +// Create inserts a new share link record. +func (r *ShareRepo) Create(ctx context.Context, share *ShareLinkPO) error { + return r.data.DB(ctx).Create(share).Error +} + +// GetByToken retrieves a share link by its token. Returns nil if not found. +func (r *ShareRepo) GetByToken(ctx context.Context, token string) (*ShareLinkPO, error) { + return r.queryOne(ctx, "token = ?", token) +} + +// GetByID retrieves a share link by its ID. Returns nil if not found. +func (r *ShareRepo) GetByID(ctx context.Context, id string) (*ShareLinkPO, error) { + return r.queryOne(ctx, "id = ?", id) +} + +// Delete removes a share link by ID and creator. Returns error if not found. +func (r *ShareRepo) Delete(ctx context.Context, id string, createdBy string) error { + result := r.data.DB(ctx). + Where("id = ? AND created_by = ?", id, createdBy). + Delete(&ShareLinkPO{}) + if result.Error != nil { + return fmt.Errorf("failed to delete share link: %w", result.Error) + } + if result.RowsAffected == 0 { + return fmt.Errorf("share link not found or not owned by user") + } + return nil +} + +// IncrementDownloadCount atomically increments the download count for the given token. +func (r *ShareRepo) IncrementDownloadCount(ctx context.Context, token string) error { + result := r.data.DB(ctx).Model(&ShareLinkPO{}). + Where("token = ?", token). + UpdateColumn("download_count", gorm.Expr("download_count + 1")) + if result.Error != nil { + return fmt.Errorf("failed to increment download count: %w", result.Error) + } + return nil +} + +// ListByResource retrieves all share links for a given resource, ordered by created_at descending. +func (r *ShareRepo) ListByResource(ctx context.Context, resourceType string, resourceID string) ([]ShareLinkPO, error) { + var links []ShareLinkPO + err := r.data.DB(ctx). + Where("resource_type = ? AND resource_id = ?", resourceType, resourceID). + Order("created_at DESC"). + Find(&links).Error + if err != nil { + return nil, fmt.Errorf("failed to list share links: %w", err) + } + return links, nil +} + +func (r *ShareRepo) queryOne(ctx context.Context, query string, args ...interface{}) (*ShareLinkPO, error) { + var share ShareLinkPO + err := r.data.DB(ctx).Where(query, args...).First(&share).Error + if err != nil { + if isRecordNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("failed to query share link: %w", err) + } + return &share, nil +} diff --git a/internal/infrastructure/database/postgres.go b/internal/infrastructure/database/postgres.go deleted file mode 100644 index bf66b77..0000000 --- a/internal/infrastructure/database/postgres.go +++ /dev/null @@ -1,83 +0,0 @@ -package database - -import ( - "database/sql" - "fmt" - - _ "github.com/jackc/pgx/v5/stdlib" - - "rag/file-system/internal/common" -) - -func NewPostgresDB(databaseURL string) (*sql.DB, error) { - db, err := sql.Open("pgx", databaseURL) - if err != nil { - return nil, fmt.Errorf("failed to open database: %w", err) - } - - if err := db.Ping(); err != nil { - return nil, fmt.Errorf("failed to ping database: %w", err) - } - - db.SetMaxOpenConns(25) - db.SetMaxIdleConns(5) - - common.Logger.Info("connected to PostgreSQL") - return db, nil -} - -func RunMigrations(db *sql.DB) error { - migrations := []string{ - `CREATE TABLE IF NOT EXISTS folders ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - parent_id UUID REFERENCES folders(id) ON DELETE CASCADE, - name VARCHAR(255) NOT NULL, - owner_id VARCHAR(36) NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - UNIQUE(parent_id, name, owner_id) - )`, - `CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)`, - `CREATE INDEX IF NOT EXISTS idx_folders_owner ON folders(owner_id)`, - - `CREATE TABLE IF NOT EXISTS files ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - folder_id UUID REFERENCES folders(id) ON DELETE CASCADE, - name VARCHAR(255) NOT NULL, - s3_key VARCHAR(512) NOT NULL, - s3_bucket VARCHAR(255) NOT NULL, - size BIGINT NOT NULL DEFAULT 0, - content_type VARCHAR(255) NOT NULL DEFAULT 'application/octet-stream', - owner_id VARCHAR(36) NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now() - )`, - `CREATE INDEX IF NOT EXISTS idx_files_folder ON files(folder_id)`, - `CREATE INDEX IF NOT EXISTS idx_files_owner ON files(owner_id)`, - `CREATE INDEX IF NOT EXISTS idx_files_s3_key ON files(s3_key)`, - - `CREATE TABLE IF NOT EXISTS share_links ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - resource_type VARCHAR(10) NOT NULL, - resource_id UUID NOT NULL, - token VARCHAR(32) NOT NULL UNIQUE, - password VARCHAR(255), - expires_at TIMESTAMPTZ, - download_count INT NOT NULL DEFAULT 0, - max_downloads INT, - created_by VARCHAR(36) NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() - )`, - `CREATE INDEX IF NOT EXISTS idx_share_token ON share_links(token)`, - `CREATE INDEX IF NOT EXISTS idx_share_resource ON share_links(resource_type, resource_id)`, - } - - for _, m := range migrations { - if _, err := db.Exec(m); err != nil { - return fmt.Errorf("migration failed: %w\nSQL: %s", err, m) - } - } - - common.Logger.Info("database migrations completed") - return nil -} diff --git a/internal/infrastructure/repository/file_meta_repo_impl.go b/internal/infrastructure/repository/file_meta_repo_impl.go deleted file mode 100644 index aca4d73..0000000 --- a/internal/infrastructure/repository/file_meta_repo_impl.go +++ /dev/null @@ -1,91 +0,0 @@ -package repository - -import ( - "context" - "database/sql" - "fmt" - - "rag/file-system/internal/domain/model" -) - -type FileMetaRepoImpl struct { - db *sql.DB -} - -func NewFileMetaRepository(db *sql.DB) *FileMetaRepoImpl { - return &FileMetaRepoImpl{db: db} -} - -func (r *FileMetaRepoImpl) Create(ctx context.Context, file *model.FileMeta) error { - _, err := r.db.ExecContext(ctx, - `INSERT INTO files (id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, - file.ID, file.FolderID, file.Name, file.S3Key, file.S3Bucket, file.Size, file.ContentType, file.OwnerID, file.CreatedAt, file.UpdatedAt) - if err != nil { - return fmt.Errorf("failed to create file meta: %w", err) - } - return nil -} - -func (r *FileMetaRepoImpl) GetByID(ctx context.Context, id string) (*model.FileMeta, error) { - var f model.FileMeta - err := r.db.QueryRowContext(ctx, - `SELECT id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at FROM files WHERE id = $1`, id). - Scan(&f.ID, &f.FolderID, &f.Name, &f.S3Key, &f.S3Bucket, &f.Size, &f.ContentType, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt) - if err == sql.ErrNoRows { - return nil, nil - } - if err != nil { - return nil, fmt.Errorf("failed to get file meta: %w", err) - } - return &f, nil -} - -func (r *FileMetaRepoImpl) GetByFolder(ctx context.Context, folderID string) ([]model.FileMeta, error) { - rows, err := r.db.QueryContext(ctx, - `SELECT id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at FROM files WHERE folder_id = $1 ORDER BY name`, folderID) - if err != nil { - return nil, fmt.Errorf("failed to get files by folder: %w", err) - } - defer rows.Close() - - return scanFiles(rows) -} - -func (r *FileMetaRepoImpl) Move(ctx context.Context, fileID string, targetFolderID string, ownerID string) error { - result, err := r.db.ExecContext(ctx, - `UPDATE files SET folder_id = $1, updated_at = now() WHERE id = $2 AND owner_id = $3`, - targetFolderID, fileID, ownerID) - if err != nil { - return fmt.Errorf("failed to move file: %w", err) - } - rows, _ := result.RowsAffected() - if rows == 0 { - return fmt.Errorf("file not found or not owned by user") - } - return nil -} - -func (r *FileMetaRepoImpl) Delete(ctx context.Context, id string, ownerID string) error { - result, err := r.db.ExecContext(ctx, `DELETE FROM files WHERE id = $1 AND owner_id = $2`, id, ownerID) - if err != nil { - return fmt.Errorf("failed to delete file meta: %w", err) - } - rows, _ := result.RowsAffected() - if rows == 0 { - return fmt.Errorf("file not found or not owned by user") - } - return nil -} - -func scanFiles(rows *sql.Rows) ([]model.FileMeta, error) { - var files []model.FileMeta - for rows.Next() { - var f model.FileMeta - if err := rows.Scan(&f.ID, &f.FolderID, &f.Name, &f.S3Key, &f.S3Bucket, &f.Size, &f.ContentType, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt); err != nil { - return nil, fmt.Errorf("failed to scan file: %w", err) - } - files = append(files, f) - } - return files, rows.Err() -} diff --git a/internal/infrastructure/repository/folder_repo_impl.go b/internal/infrastructure/repository/folder_repo_impl.go deleted file mode 100644 index aed3e85..0000000 --- a/internal/infrastructure/repository/folder_repo_impl.go +++ /dev/null @@ -1,154 +0,0 @@ -package repository - -import ( - "context" - "database/sql" - "fmt" - - "rag/file-system/internal/domain/model" -) - -type FolderRepoImpl struct { - db *sql.DB -} - -func NewFolderRepository(db *sql.DB) *FolderRepoImpl { - return &FolderRepoImpl{db: db} -} - -func (r *FolderRepoImpl) Create(ctx context.Context, folder *model.Folder) error { - _, err := r.db.ExecContext(ctx, - `INSERT INTO folders (id, parent_id, name, owner_id, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6)`, - folder.ID, folder.ParentID, folder.Name, folder.OwnerID, folder.CreatedAt, folder.UpdatedAt) - if err != nil { - return fmt.Errorf("failed to create folder: %w", err) - } - return nil -} - -func (r *FolderRepoImpl) GetByID(ctx context.Context, id string) (*model.Folder, error) { - var f model.Folder - var parentID sql.NullString - err := r.db.QueryRowContext(ctx, - `SELECT id, parent_id, name, owner_id, created_at, updated_at FROM folders WHERE id = $1`, id). - Scan(&f.ID, &parentID, &f.Name, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt) - if err == sql.ErrNoRows { - return nil, nil - } - if err != nil { - return nil, fmt.Errorf("failed to get folder: %w", err) - } - if parentID.Valid { - f.ParentID = &parentID.String - } - return &f, nil -} - -func (r *FolderRepoImpl) GetWithChildren(ctx context.Context, id string, ownerID string) (*model.FolderWithChildren, error) { - folder, err := r.GetByID(ctx, id) - if err != nil { - return nil, err - } - if folder == nil || folder.OwnerID != ownerID { - return nil, nil - } - - subFolders, err := r.queryFolders(ctx, - `SELECT id, parent_id, name, owner_id, created_at, updated_at FROM folders WHERE parent_id = $1 AND owner_id = $2 ORDER BY name`, id, ownerID) - if err != nil { - return nil, err - } - - files, err := r.queryFiles(ctx, - `SELECT id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at FROM files WHERE folder_id = $1 AND owner_id = $2 ORDER BY name`, id, ownerID) - if err != nil { - return nil, err - } - - return &model.FolderWithChildren{ - Folder: *folder, - SubFolders: subFolders, - Files: files, - }, nil -} - -func (r *FolderRepoImpl) GetTree(ctx context.Context, ownerID string) ([]model.Folder, error) { - return r.queryFolders(ctx, - `SELECT id, parent_id, name, owner_id, created_at, updated_at FROM folders WHERE owner_id = $1 ORDER BY name`, ownerID) -} - -func (r *FolderRepoImpl) Update(ctx context.Context, folder *model.Folder) error { - _, err := r.db.ExecContext(ctx, - `UPDATE folders SET name = $1, updated_at = $2 WHERE id = $3`, - folder.Name, folder.UpdatedAt, folder.ID) - if err != nil { - return fmt.Errorf("failed to update folder: %w", err) - } - return nil -} - -func (r *FolderRepoImpl) Delete(ctx context.Context, id string, ownerID string) error { - result, err := r.db.ExecContext(ctx, `DELETE FROM folders WHERE id = $1 AND owner_id = $2`, id, ownerID) - if err != nil { - return fmt.Errorf("failed to delete folder: %w", err) - } - rows, _ := result.RowsAffected() - if rows == 0 { - return fmt.Errorf("folder not found or not owned by user") - } - return nil -} - -func (r *FolderRepoImpl) GetDescendantFileS3Keys(ctx context.Context, id string, ownerID string) ([]model.FileMeta, error) { - query := ` - WITH RECURSIVE descendants AS ( - SELECT id FROM folders WHERE id = $1 AND owner_id = $2 - UNION - SELECT f.id FROM folders f INNER JOIN descendants d ON f.parent_id = d.id - ) - SELECT fi.id, fi.folder_id, fi.name, fi.s3_key, fi.s3_bucket, fi.size, fi.content_type, fi.owner_id, fi.created_at, fi.updated_at - FROM files fi INNER JOIN descendants d ON fi.folder_id = d.id - WHERE fi.owner_id = $2` - return r.queryFiles(ctx, query, id, ownerID) -} - -func (r *FolderRepoImpl) queryFolders(ctx context.Context, query string, args ...interface{}) ([]model.Folder, error) { - rows, err := r.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to query folders: %w", err) - } - defer rows.Close() - - var folders []model.Folder - for rows.Next() { - var f model.Folder - var parentID sql.NullString - if err := rows.Scan(&f.ID, &parentID, &f.Name, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt); err != nil { - return nil, fmt.Errorf("failed to scan folder: %w", err) - } - if parentID.Valid { - f.ParentID = &parentID.String - } - folders = append(folders, f) - } - return folders, rows.Err() -} - -func (r *FolderRepoImpl) queryFiles(ctx context.Context, query string, args ...interface{}) ([]model.FileMeta, error) { - rows, err := r.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("failed to query files: %w", err) - } - defer rows.Close() - - var files []model.FileMeta - for rows.Next() { - var f model.FileMeta - if err := rows.Scan(&f.ID, &f.FolderID, &f.Name, &f.S3Key, &f.S3Bucket, &f.Size, &f.ContentType, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt); err != nil { - return nil, fmt.Errorf("failed to scan file: %w", err) - } - files = append(files, f) - } - return files, rows.Err() -} diff --git a/internal/infrastructure/repository/share_repo_impl.go b/internal/infrastructure/repository/share_repo_impl.go deleted file mode 100644 index 5ac2653..0000000 --- a/internal/infrastructure/repository/share_repo_impl.go +++ /dev/null @@ -1,90 +0,0 @@ -package repository - -import ( - "context" - "database/sql" - "fmt" - - "rag/file-system/internal/domain/model" -) - -type ShareRepoImpl struct { - db *sql.DB -} - -func NewShareRepository(db *sql.DB) *ShareRepoImpl { - return &ShareRepoImpl{db: db} -} - -func (r *ShareRepoImpl) Create(ctx context.Context, share *model.ShareLink) error { - _, err := r.db.ExecContext(ctx, - `INSERT INTO share_links (id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, - share.ID, share.ResourceType, share.ResourceID, share.Token, share.Password, share.ExpiresAt, - share.DownloadCount, share.MaxDownloads, share.CreatedBy, share.CreatedAt) - if err != nil { - return fmt.Errorf("failed to create share link: %w", err) - } - return nil -} - -func (r *ShareRepoImpl) GetByToken(ctx context.Context, token string) (*model.ShareLink, error) { - return r.queryOne(ctx, `SELECT id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at FROM share_links WHERE token = $1`, token) -} - -func (r *ShareRepoImpl) GetByID(ctx context.Context, id string) (*model.ShareLink, error) { - return r.queryOne(ctx, `SELECT id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at FROM share_links WHERE id = $1`, id) -} - -func (r *ShareRepoImpl) Delete(ctx context.Context, id string, createdBy string) error { - result, err := r.db.ExecContext(ctx, `DELETE FROM share_links WHERE id = $1 AND created_by = $2`, id, createdBy) - if err != nil { - return fmt.Errorf("failed to delete share link: %w", err) - } - rows, _ := result.RowsAffected() - if rows == 0 { - return fmt.Errorf("share link not found or not owned by user") - } - return nil -} - -func (r *ShareRepoImpl) IncrementDownloadCount(ctx context.Context, token string) error { - _, err := r.db.ExecContext(ctx, `UPDATE share_links SET download_count = download_count + 1 WHERE token = $1`, token) - if err != nil { - return fmt.Errorf("failed to increment download count: %w", err) - } - return nil -} - -func (r *ShareRepoImpl) ListByResource(ctx context.Context, resourceType string, resourceID string) ([]model.ShareLink, error) { - rows, err := r.db.QueryContext(ctx, - `SELECT id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at FROM share_links WHERE resource_type = $1 AND resource_id = $2 ORDER BY created_at DESC`, - resourceType, resourceID) - if err != nil { - return nil, fmt.Errorf("failed to list share links: %w", err) - } - defer rows.Close() - - var links []model.ShareLink - for rows.Next() { - var s model.ShareLink - if err := rows.Scan(&s.ID, &s.ResourceType, &s.ResourceID, &s.Token, &s.Password, &s.ExpiresAt, &s.DownloadCount, &s.MaxDownloads, &s.CreatedBy, &s.CreatedAt); err != nil { - return nil, fmt.Errorf("failed to scan share link: %w", err) - } - links = append(links, s) - } - return links, rows.Err() -} - -func (r *ShareRepoImpl) queryOne(ctx context.Context, query string, args ...interface{}) (*model.ShareLink, error) { - var s model.ShareLink - err := r.db.QueryRowContext(ctx, query, args...). - Scan(&s.ID, &s.ResourceType, &s.ResourceID, &s.Token, &s.Password, &s.ExpiresAt, &s.DownloadCount, &s.MaxDownloads, &s.CreatedBy, &s.CreatedAt) - if err == sql.ErrNoRows { - return nil, nil - } - if err != nil { - return nil, fmt.Errorf("failed to query share link: %w", err) - } - return &s, nil -} diff --git a/internal/infrastructure/s3/client.go b/internal/infrastructure/s3/client.go deleted file mode 100644 index d0919e8..0000000 --- a/internal/infrastructure/s3/client.go +++ /dev/null @@ -1,56 +0,0 @@ -package s3 - -import ( - "context" - "rag/file-system/internal/common" - "log" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -type RustFSClient struct { - client *s3.Client - presignClient *s3.PresignClient -} - -func NewRustFSClient(cfg *common.Config) *RustFSClient { - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { - return aws.Endpoint{ - URL: cfg.RustFSEndpoint, - SigningRegion: cfg.RustFSRegion, - }, nil - }) - - awsCfg, err := config.LoadDefaultConfig(context.TODO(), - config.WithRegion(cfg.RustFSRegion), - config.WithEndpointResolverWithOptions(customResolver), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( - cfg.RustFSAccessKeyID, - cfg.RustFSSecretAccessKey, - "", - )), - ) - if err != nil { - log.Fatalf("unable to load SDK config, %v", err) - } - - client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { - o.UsePathStyle = true - }) - - return &RustFSClient{ - client: client, - presignClient: s3.NewPresignClient(client), - } -} - -func (c *RustFSClient) S3Client() *s3.Client { - return c.client -} - -func (c *RustFSClient) PresignClient() *s3.PresignClient { - return c.presignClient -} diff --git a/internal/infrastructure/s3/file_repository_impl.go b/internal/infrastructure/s3/file_repository_impl.go deleted file mode 100644 index d6713fe..0000000 --- a/internal/infrastructure/s3/file_repository_impl.go +++ /dev/null @@ -1,225 +0,0 @@ -package s3 - -import ( - "context" - "rag/file-system/internal/common" - "rag/file-system/internal/domain/repository" - "io" - "sort" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -const maxContentPreviewSize = 10 * 1024 * 1024 // 10MB - -type S3FileRepository struct { - client *RustFSClient -} - -func NewS3FileRepository(client *RustFSClient) repository.FileRepository { - return &S3FileRepository{client: client} -} - -func (r *S3FileRepository) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error { - _, err := r.client.S3Client().PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - Body: data, - }) - return common.WrapS3Error(err) -} - -func (r *S3FileRepository) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) { - resp, err := r.client.S3Client().GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - }) - if err != nil { - return nil, common.WrapS3Error(err) - } - return resp.Body, nil -} - -func (r *S3FileRepository) ListBuckets(ctx context.Context) ([]string, error) { - resp, err := r.client.S3Client().ListBuckets(ctx, &s3.ListBucketsInput{}) - if err != nil { - return nil, common.WrapS3Error(err) - } - var buckets []string - for _, b := range resp.Buckets { - if b.Name != nil { - buckets = append(buckets, *b.Name) - } - } - return buckets, nil -} - -func (r *S3FileRepository) CreateBucket(ctx context.Context, bucketName string) error { - _, err := r.client.S3Client().CreateBucket(ctx, &s3.CreateBucketInput{ - Bucket: aws.String(bucketName), - }) - return common.WrapS3Error(err) -} - -func (r *S3FileRepository) DeleteBucket(ctx context.Context, bucketName string) error { - _, err := r.client.S3Client().DeleteBucket(ctx, &s3.DeleteBucketInput{ - Bucket: aws.String(bucketName), - }) - return common.WrapS3Error(err) -} - -// GetFileContent retrieves text file content for preview (e.g., Markdown files) -func (r *S3FileRepository) GetFileContent(ctx context.Context, bucketName string, objectKey string) (string, error) { - resp, err := r.client.S3Client().GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - }) - if err != nil { - return "", common.WrapS3Error(err) - } - defer resp.Body.Close() - - data, err := io.ReadAll(io.LimitReader(resp.Body, maxContentPreviewSize)) - if err != nil { - return "", err - } - if int64(len(data)) >= maxContentPreviewSize { - return "", common.NewBusinessException("file too large for content preview (max 10MB)") - } - return string(data), nil -} - -// DeleteFile removes a file from the bucket -func (r *S3FileRepository) DeleteFile(ctx context.Context, bucketName string, objectKey string) error { - _, err := r.client.S3Client().DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - }) - return common.WrapS3Error(err) -} - -// ListObjectsV2 lists files with pagination support -func (r *S3FileRepository) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*repository.ListFilesResult, error) { - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(bucketName), - Prefix: aws.String(prefix), - MaxKeys: aws.Int32(maxKeys), - } - if continuationToken != nil && *continuationToken != "" { - input.ContinuationToken = continuationToken - } - - resp, err := r.client.S3Client().ListObjectsV2(ctx, input) - if err != nil { - return nil, common.WrapS3Error(err) - } - - files := make([]repository.FileInfo, 0, len(resp.Contents)) - for _, obj := range resp.Contents { - if obj.Key == nil || obj.Size == nil || obj.LastModified == nil || obj.ETag == nil { - continue - } - files = append(files, repository.FileInfo{ - Key: *obj.Key, - Size: *obj.Size, - LastModified: *obj.LastModified, - ETag: *obj.ETag, - }) - } - - return &repository.ListFilesResult{ - Files: files, - NextContinuationToken: resp.NextContinuationToken, - }, nil -} - -// GeneratePresignedURL generates a presigned URL for temporary file access -func (r *S3FileRepository) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) { - presignResult, err := r.client.PresignClient().PresignGetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - }, func(opts *s3.PresignOptions) { - opts.Expires = expiry - }) - if err != nil { - return "", common.WrapS3Error(err) - } - return presignResult.URL, nil -} - -// CreateMultipartUpload initializes a multipart upload session -func (r *S3FileRepository) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) { - resp, err := r.client.S3Client().CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - }) - if err != nil { - return "", common.WrapS3Error(err) - } - if resp.UploadId == nil { - return "", common.NewBusinessException("failed to initialize multipart upload") - } - return *resp.UploadId, nil -} - -// UploadPart uploads a single part of a multipart upload -func (r *S3FileRepository) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) { - resp, err := r.client.S3Client().UploadPart(ctx, &s3.UploadPartInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - UploadId: aws.String(uploadId), - PartNumber: aws.Int32(partNumber), - Body: data, - }) - if err != nil { - return "", common.WrapS3Error(err) - } - if resp.ETag == nil { - return "", common.NewBusinessException("failed to upload part") - } - return *resp.ETag, nil -} - -// CompleteMultipartUpload assembles all parts to complete the upload -func (r *S3FileRepository) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []common.Part) (string, error) { - sort.Slice(parts, func(i, j int) bool { - return parts[i].PartNumber < parts[j].PartNumber - }) - - completedParts := make([]types.CompletedPart, len(parts)) - for i, p := range parts { - completedParts[i] = types.CompletedPart{ - ETag: aws.String(p.ETag), - PartNumber: aws.Int32(p.PartNumber), - } - } - - resp, err := r.client.S3Client().CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - UploadId: aws.String(uploadId), - MultipartUpload: &types.CompletedMultipartUpload{ - Parts: completedParts, - }, - }) - if err != nil { - return "", common.WrapS3Error(err) - } - if resp.Location == nil { - return "", common.NewBusinessException("failed to complete multipart upload") - } - return *resp.Location, nil -} - -// AbortMultipartUpload cancels an in-progress multipart upload -func (r *S3FileRepository) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error { - _, err := r.client.S3Client().AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectKey), - UploadId: aws.String(uploadId), - }) - return common.WrapS3Error(err) -}