feat: add data layer with GORM models, S3 repo, PG repos

Replace old infrastructure layer with Kratos-style data layer:
- data.go: GORM connection, transaction support, Wire ProviderSet, PO models
- file_repo.go: All 12 S3 operations (upload, download, multipart, presign, buckets)
- folder_repo.go: GORM queries including recursive CTE for descendant files
- file_meta_repo.go: CRUD + move operations for file metadata
- share_repo.go: CRUD + increment download count for share links

Deleted old infrastructure/database, infrastructure/repository, infrastructure/s3.
Kept infrastructure/grpc for later integration.
This commit is contained in:
向宁 2026-05-25 12:57:42 +08:00
parent 7faddfed05
commit bcd637387a
11 changed files with 731 additions and 699 deletions

120
internal/data/data.go Normal file
View File

@ -0,0 +1,120 @@
package data
import (
"context"
"time"
"rag/file-system/internal/conf"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/wire"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// Data holds the GORM database connection and provides transaction support.
type Data struct {
db *gorm.DB
log *log.Helper
}
// contextTxKey is the context key for storing the current transaction.
type contextTxKey struct{}
// NewData creates a new Data instance with GORM connected to PostgreSQL.
func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) {
helper := log.NewHelper(logger)
db, err := gorm.Open(postgres.Open(c.Database.Source), &gorm.Config{})
if err != nil {
return nil, nil, err
}
sqlDB, err := db.DB()
if err != nil {
return nil, nil, err
}
sqlDB.SetMaxOpenConns(25)
sqlDB.SetMaxIdleConns(5)
if err := db.AutoMigrate(&FolderPO{}, &FileMetaPO{}, &ShareLinkPO{}); err != nil {
return nil, nil, err
}
helper.Info("connected to PostgreSQL via GORM")
cleanup := func() {
sqlDB.Close()
}
return &Data{db: db, log: helper}, cleanup, nil
}
// DB returns the *gorm.DB for the given context. If a transaction is active
// in the context, it returns the transaction DB; otherwise the global DB.
func (d *Data) DB(ctx context.Context) *gorm.DB {
tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB)
if ok {
return tx
}
return d.db.WithContext(ctx)
}
// Transaction is the interface for executing operations within a database transaction.
type Transaction interface {
InTx(ctx context.Context, fn func(ctx context.Context) error) error
}
// InTx executes fn inside a database transaction.
func (d *Data) InTx(ctx context.Context, fn func(ctx context.Context) error) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
ctx = context.WithValue(ctx, contextTxKey{}, tx)
return fn(ctx)
})
}
// ProviderSet is the Wire provider set for the data layer.
var ProviderSet = wire.NewSet(NewData, NewFileRepo, NewFolderRepo, NewFileMetaRepo, NewShareRepo)
// --- GORM Models (Persistence Objects) ---
// FolderPO maps to the "folders" table.
type FolderPO struct {
ID string `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"`
ParentID *string `gorm:"type:uuid;index:idx_folders_parent"`
Name string `gorm:"type:varchar(255);not null"`
OwnerID string `gorm:"type:varchar(36);not null;index:idx_folders_owner"`
CreatedAt time.Time `gorm:"autoCreateTime"`
UpdatedAt time.Time `gorm:"autoUpdateTime"`
}
func (FolderPO) TableName() string { return "folders" }
// FileMetaPO maps to the "files" table.
type FileMetaPO struct {
ID string `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"`
FolderID string `gorm:"type:uuid;index:idx_files_folder"`
Name string `gorm:"type:varchar(255);not null"`
S3Key string `gorm:"type:varchar(512);not null;index:idx_files_s3_key"`
S3Bucket string `gorm:"type:varchar(255);not null"`
Size int64 `gorm:"default:0"`
ContentType string `gorm:"type:varchar(255);default:'application/octet-stream'"`
OwnerID string `gorm:"type:varchar(36);not null;index:idx_files_owner"`
CreatedAt time.Time `gorm:"autoCreateTime"`
UpdatedAt time.Time `gorm:"autoUpdateTime"`
}
func (FileMetaPO) TableName() string { return "files" }
// ShareLinkPO maps to the "share_links" table.
type ShareLinkPO struct {
ID string `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"`
ResourceType string `gorm:"type:varchar(10);not null"`
ResourceID string `gorm:"type:uuid;not null"`
Token string `gorm:"type:varchar(32);not null;uniqueIndex:idx_share_token"`
Password *string `gorm:"type:varchar(255)"`
ExpiresAt *time.Time `gorm:"type:timestamptz"`
DownloadCount int `gorm:"default:0"`
MaxDownloads *int
CreatedBy string `gorm:"type:varchar(36);not null"`
CreatedAt time.Time `gorm:"autoCreateTime"`
}
func (ShareLinkPO) TableName() string { return "share_links" }

View File

@ -0,0 +1,90 @@
package data
import (
"context"
"fmt"
"time"
"github.com/go-kratos/kratos/v2/log"
)
// FileMetaRepo implements file metadata persistence operations using GORM.
type FileMetaRepo struct {
data *Data
log *log.Helper
}
// NewFileMetaRepo creates a new FileMetaRepo.
func NewFileMetaRepo(data *Data, logger log.Logger) *FileMetaRepo {
return &FileMetaRepo{
data: data,
log: log.NewHelper(logger),
}
}
// Create inserts a new file metadata record.
func (r *FileMetaRepo) Create(ctx context.Context, file *FileMetaPO) error {
return r.data.DB(ctx).Create(file).Error
}
// GetByID retrieves file metadata by ID. Returns nil if not found.
func (r *FileMetaRepo) GetByID(ctx context.Context, id string) (*FileMetaPO, error) {
var file FileMetaPO
err := r.data.DB(ctx).Where("id = ?", id).First(&file).Error
if err != nil {
if isRecordNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("failed to get file meta: %w", err)
}
return &file, nil
}
// GetByFolder retrieves all files in a folder, ordered by name.
func (r *FileMetaRepo) GetByFolder(ctx context.Context, folderID string) ([]FileMetaPO, error) {
var files []FileMetaPO
err := r.data.DB(ctx).
Where("folder_id = ?", folderID).
Order("name").
Find(&files).Error
if err != nil {
return nil, fmt.Errorf("failed to get files by folder: %w", err)
}
return files, nil
}
// Move updates the folder_id of a file (moves it to another folder).
func (r *FileMetaRepo) Move(ctx context.Context, fileID string, targetFolderID string, ownerID string) error {
result := r.data.DB(ctx).Model(&FileMetaPO{}).
Where("id = ? AND owner_id = ?", fileID, ownerID).
Updates(map[string]interface{}{
"folder_id": targetFolderID,
"updated_at": gormNow(),
})
if result.Error != nil {
return fmt.Errorf("failed to move file: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("file not found or not owned by user")
}
return nil
}
// Delete removes a file metadata record by ID and ownerID.
func (r *FileMetaRepo) Delete(ctx context.Context, id string, ownerID string) error {
result := r.data.DB(ctx).
Where("id = ? AND owner_id = ?", id, ownerID).
Delete(&FileMetaPO{})
if result.Error != nil {
return fmt.Errorf("failed to delete file meta: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("file not found or not owned by user")
}
return nil
}
// gormNow returns the current UTC time for use in GORM updates.
func gormNow() time.Time {
return time.Now().UTC()
}

285
internal/data/file_repo.go Normal file
View File

@ -0,0 +1,285 @@
package data
import (
"context"
"fmt"
"io"
"sort"
"time"
"rag/file-system/internal/conf"
"rag/file-system/internal/pkg/s3errors"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
const maxContentPreviewSize = 10 * 1024 * 1024 // 10MB
// Part represents a single uploaded part in a multipart upload.
type Part struct {
ETag string
PartNumber int32
}
// FileInfo holds metadata about an S3 object.
type FileInfo struct {
Key string
Size int64
LastModified time.Time
ETag string
}
// ListFilesResult holds the result of a paginated list operation.
type ListFilesResult struct {
Files []FileInfo
NextContinuationToken *string
}
// FileRepo handles all S3 storage operations.
type FileRepo struct {
client *s3.Client
presignClient *s3.PresignClient
}
// NewFileRepo creates a new FileRepo with an S3 client configured from the provided config.
func NewFileRepo(c *conf.Data) *FileRepo {
s3Conf := c.GetS3()
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: s3Conf.GetEndpoint(),
SigningRegion: s3Conf.GetRegion(),
}, nil
})
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(s3Conf.GetRegion()),
config.WithEndpointResolverWithOptions(customResolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
s3Conf.GetAccessKey(),
s3Conf.GetSecretKey(),
"",
)),
)
if err != nil {
panic(fmt.Sprintf("unable to load S3 SDK config: %v", err))
}
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
})
return &FileRepo{
client: client,
presignClient: s3.NewPresignClient(client),
}
}
// UploadFile uploads data to the specified bucket and object key.
func (r *FileRepo) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error {
_, err := r.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: data,
})
return s3errors.Wrap(err)
}
// DownloadFile downloads an object from S3 and returns its body as a ReadCloser.
func (r *FileRepo) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) {
resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return nil, s3errors.Wrap(err)
}
return resp.Body, nil
}
// ListBuckets returns all bucket names.
func (r *FileRepo) ListBuckets(ctx context.Context) ([]string, error) {
resp, err := r.client.ListBuckets(ctx, &s3.ListBucketsInput{})
if err != nil {
return nil, s3errors.Wrap(err)
}
var buckets []string
for _, b := range resp.Buckets {
if b.Name != nil {
buckets = append(buckets, *b.Name)
}
}
return buckets, nil
}
// CreateBucket creates a new S3 bucket.
func (r *FileRepo) CreateBucket(ctx context.Context, bucketName string) error {
_, err := r.client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
return s3errors.Wrap(err)
}
// DeleteBucket deletes an S3 bucket.
func (r *FileRepo) DeleteBucket(ctx context.Context, bucketName string) error {
_, err := r.client.DeleteBucket(ctx, &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
return s3errors.Wrap(err)
}
// GetFileContent retrieves text file content for preview (e.g., Markdown files).
func (r *FileRepo) GetFileContent(ctx context.Context, bucketName string, objectKey string) (string, error) {
resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return "", s3errors.Wrap(err)
}
defer resp.Body.Close()
data, err := io.ReadAll(io.LimitReader(resp.Body, maxContentPreviewSize))
if err != nil {
return "", err
}
if int64(len(data)) >= maxContentPreviewSize {
return "", fmt.Errorf("file too large for content preview (max 10MB)")
}
return string(data), nil
}
// DeleteFile removes a file from the bucket.
func (r *FileRepo) DeleteFile(ctx context.Context, bucketName string, objectKey string) error {
_, err := r.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
return s3errors.Wrap(err)
}
// ListObjectsV2 lists files with pagination support.
func (r *FileRepo) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*ListFilesResult, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
MaxKeys: aws.Int32(maxKeys),
}
if continuationToken != nil && *continuationToken != "" {
input.ContinuationToken = continuationToken
}
resp, err := r.client.ListObjectsV2(ctx, input)
if err != nil {
return nil, s3errors.Wrap(err)
}
files := make([]FileInfo, 0, len(resp.Contents))
for _, obj := range resp.Contents {
if obj.Key == nil || obj.Size == nil || obj.LastModified == nil || obj.ETag == nil {
continue
}
files = append(files, FileInfo{
Key: *obj.Key,
Size: *obj.Size,
LastModified: *obj.LastModified,
ETag: *obj.ETag,
})
}
return &ListFilesResult{
Files: files,
NextContinuationToken: resp.NextContinuationToken,
}, nil
}
// GeneratePresignedURL generates a presigned URL for temporary file access.
func (r *FileRepo) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) {
presignResult, err := r.presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
}, func(opts *s3.PresignOptions) {
opts.Expires = expiry
})
if err != nil {
return "", s3errors.Wrap(err)
}
return presignResult.URL, nil
}
// CreateMultipartUpload initializes a multipart upload session.
func (r *FileRepo) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) {
resp, err := r.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return "", s3errors.Wrap(err)
}
if resp.UploadId == nil {
return "", fmt.Errorf("failed to initialize multipart upload")
}
return *resp.UploadId, nil
}
// UploadPart uploads a single part of a multipart upload.
func (r *FileRepo) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) {
resp, err := r.client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
PartNumber: aws.Int32(partNumber),
Body: data,
})
if err != nil {
return "", s3errors.Wrap(err)
}
if resp.ETag == nil {
return "", fmt.Errorf("failed to upload part")
}
return *resp.ETag, nil
}
// CompleteMultipartUpload assembles all parts to complete the upload.
func (r *FileRepo) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []Part) (string, error) {
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
completedParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
completedParts[i] = types.CompletedPart{
ETag: aws.String(p.ETag),
PartNumber: aws.Int32(p.PartNumber),
}
}
resp, err := r.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
return "", s3errors.Wrap(err)
}
if resp.Location == nil {
return "", fmt.Errorf("failed to complete multipart upload")
}
return *resp.Location, nil
}
// AbortMultipartUpload cancels an in-progress multipart upload.
func (r *FileRepo) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error {
_, err := r.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
})
return s3errors.Wrap(err)
}

View File

@ -0,0 +1,148 @@
package data
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"gorm.io/gorm"
)
// FolderRepo implements folder persistence operations using GORM.
type FolderRepo struct {
data *Data
log *log.Helper
}
// NewFolderRepo creates a new FolderRepo.
func NewFolderRepo(data *Data, logger log.Logger) *FolderRepo {
return &FolderRepo{
data: data,
log: log.NewHelper(logger),
}
}
// FolderWithChildren holds a folder along with its sub-folders and files.
type FolderWithChildren struct {
Folder FolderPO
SubFolders []FolderPO
Files []FileMetaPO
}
// Create inserts a new folder record.
func (r *FolderRepo) Create(ctx context.Context, folder *FolderPO) error {
return r.data.DB(ctx).Create(folder).Error
}
// GetByID retrieves a folder by its ID. Returns nil if not found.
func (r *FolderRepo) GetByID(ctx context.Context, id string) (*FolderPO, error) {
var folder FolderPO
err := r.data.DB(ctx).Where("id = ?", id).First(&folder).Error
if err != nil {
if isRecordNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("failed to get folder: %w", err)
}
return &folder, nil
}
// GetWithChildren retrieves a folder with its sub-folders and files, filtered by ownerID.
func (r *FolderRepo) GetWithChildren(ctx context.Context, id string, ownerID string) (*FolderWithChildren, error) {
folder, err := r.GetByID(ctx, id)
if err != nil {
return nil, err
}
if folder == nil || folder.OwnerID != ownerID {
return nil, nil
}
var subFolders []FolderPO
if err := r.data.DB(ctx).
Where("parent_id = ? AND owner_id = ?", id, ownerID).
Order("name").
Find(&subFolders).Error; err != nil {
return nil, fmt.Errorf("failed to query sub-folders: %w", err)
}
var files []FileMetaPO
if err := r.data.DB(ctx).
Where("folder_id = ? AND owner_id = ?", id, ownerID).
Order("name").
Find(&files).Error; err != nil {
return nil, fmt.Errorf("failed to query files: %w", err)
}
return &FolderWithChildren{
Folder: *folder,
SubFolders: subFolders,
Files: files,
}, nil
}
// GetTree retrieves all folders owned by the given ownerID.
func (r *FolderRepo) GetTree(ctx context.Context, ownerID string) ([]FolderPO, error) {
var folders []FolderPO
err := r.data.DB(ctx).
Where("owner_id = ?", ownerID).
Order("name").
Find(&folders).Error
if err != nil {
return nil, fmt.Errorf("failed to get folder tree: %w", err)
}
return folders, nil
}
// Update modifies a folder's name and updated_at timestamp.
func (r *FolderRepo) Update(ctx context.Context, folder *FolderPO) error {
result := r.data.DB(ctx).Model(&FolderPO{}).
Where("id = ?", folder.ID).
Updates(map[string]interface{}{
"name": folder.Name,
"updated_at": folder.UpdatedAt,
})
if result.Error != nil {
return fmt.Errorf("failed to update folder: %w", result.Error)
}
return nil
}
// Delete removes a folder by ID and ownerID. Returns error if not found.
func (r *FolderRepo) Delete(ctx context.Context, id string, ownerID string) error {
result := r.data.DB(ctx).
Where("id = ? AND owner_id = ?", id, ownerID).
Delete(&FolderPO{})
if result.Error != nil {
return fmt.Errorf("failed to delete folder: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("folder not found or not owned by user")
}
return nil
}
// GetDescendantFileS3Keys returns all files in a folder and all its descendant folders.
// Uses a recursive CTE to walk the folder tree.
func (r *FolderRepo) GetDescendantFileS3Keys(ctx context.Context, id string, ownerID string) ([]FileMetaPO, error) {
var files []FileMetaPO
// Raw SQL with recursive CTE - GORM doesn't natively support recursive queries
err := r.data.DB(ctx).Raw(`
WITH RECURSIVE descendants AS (
SELECT id FROM folders WHERE id = ? AND owner_id = ?
UNION
SELECT f.id FROM folders f INNER JOIN descendants d ON f.parent_id = d.id
)
SELECT fi.id, fi.folder_id, fi.name, fi.s3_key, fi.s3_bucket, fi.size, fi.content_type, fi.owner_id, fi.created_at, fi.updated_at
FROM files fi INNER JOIN descendants d ON fi.folder_id = d.id
WHERE fi.owner_id = ?`,
id, ownerID, ownerID).Scan(&files).Error
if err != nil {
return nil, fmt.Errorf("failed to get descendant file S3 keys: %w", err)
}
return files, nil
}
// isRecordNotFound checks if the error is a GORM record-not-found error.
func isRecordNotFound(err error) bool {
return err == gorm.ErrRecordNotFound
}

View File

@ -0,0 +1,88 @@
package data
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"gorm.io/gorm"
)
// ShareRepo implements share link persistence operations using GORM.
type ShareRepo struct {
data *Data
log *log.Helper
}
// NewShareRepo creates a new ShareRepo.
func NewShareRepo(data *Data, logger log.Logger) *ShareRepo {
return &ShareRepo{
data: data,
log: log.NewHelper(logger),
}
}
// Create inserts a new share link record.
func (r *ShareRepo) Create(ctx context.Context, share *ShareLinkPO) error {
return r.data.DB(ctx).Create(share).Error
}
// GetByToken retrieves a share link by its token. Returns nil if not found.
func (r *ShareRepo) GetByToken(ctx context.Context, token string) (*ShareLinkPO, error) {
return r.queryOne(ctx, "token = ?", token)
}
// GetByID retrieves a share link by its ID. Returns nil if not found.
func (r *ShareRepo) GetByID(ctx context.Context, id string) (*ShareLinkPO, error) {
return r.queryOne(ctx, "id = ?", id)
}
// Delete removes a share link by ID and creator. Returns error if not found.
func (r *ShareRepo) Delete(ctx context.Context, id string, createdBy string) error {
result := r.data.DB(ctx).
Where("id = ? AND created_by = ?", id, createdBy).
Delete(&ShareLinkPO{})
if result.Error != nil {
return fmt.Errorf("failed to delete share link: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("share link not found or not owned by user")
}
return nil
}
// IncrementDownloadCount atomically increments the download count for the given token.
func (r *ShareRepo) IncrementDownloadCount(ctx context.Context, token string) error {
result := r.data.DB(ctx).Model(&ShareLinkPO{}).
Where("token = ?", token).
UpdateColumn("download_count", gorm.Expr("download_count + 1"))
if result.Error != nil {
return fmt.Errorf("failed to increment download count: %w", result.Error)
}
return nil
}
// ListByResource retrieves all share links for a given resource, ordered by created_at descending.
func (r *ShareRepo) ListByResource(ctx context.Context, resourceType string, resourceID string) ([]ShareLinkPO, error) {
var links []ShareLinkPO
err := r.data.DB(ctx).
Where("resource_type = ? AND resource_id = ?", resourceType, resourceID).
Order("created_at DESC").
Find(&links).Error
if err != nil {
return nil, fmt.Errorf("failed to list share links: %w", err)
}
return links, nil
}
func (r *ShareRepo) queryOne(ctx context.Context, query string, args ...interface{}) (*ShareLinkPO, error) {
var share ShareLinkPO
err := r.data.DB(ctx).Where(query, args...).First(&share).Error
if err != nil {
if isRecordNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("failed to query share link: %w", err)
}
return &share, nil
}

View File

@ -1,83 +0,0 @@
package database
import (
"database/sql"
"fmt"
_ "github.com/jackc/pgx/v5/stdlib"
"rag/file-system/internal/common"
)
func NewPostgresDB(databaseURL string) (*sql.DB, error) {
db, err := sql.Open("pgx", databaseURL)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
common.Logger.Info("connected to PostgreSQL")
return db, nil
}
func RunMigrations(db *sql.DB) error {
migrations := []string{
`CREATE TABLE IF NOT EXISTS folders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
parent_id UUID REFERENCES folders(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
owner_id VARCHAR(36) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(parent_id, name, owner_id)
)`,
`CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)`,
`CREATE INDEX IF NOT EXISTS idx_folders_owner ON folders(owner_id)`,
`CREATE TABLE IF NOT EXISTS files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
folder_id UUID REFERENCES folders(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
s3_key VARCHAR(512) NOT NULL,
s3_bucket VARCHAR(255) NOT NULL,
size BIGINT NOT NULL DEFAULT 0,
content_type VARCHAR(255) NOT NULL DEFAULT 'application/octet-stream',
owner_id VARCHAR(36) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)`,
`CREATE INDEX IF NOT EXISTS idx_files_folder ON files(folder_id)`,
`CREATE INDEX IF NOT EXISTS idx_files_owner ON files(owner_id)`,
`CREATE INDEX IF NOT EXISTS idx_files_s3_key ON files(s3_key)`,
`CREATE TABLE IF NOT EXISTS share_links (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
resource_type VARCHAR(10) NOT NULL,
resource_id UUID NOT NULL,
token VARCHAR(32) NOT NULL UNIQUE,
password VARCHAR(255),
expires_at TIMESTAMPTZ,
download_count INT NOT NULL DEFAULT 0,
max_downloads INT,
created_by VARCHAR(36) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)`,
`CREATE INDEX IF NOT EXISTS idx_share_token ON share_links(token)`,
`CREATE INDEX IF NOT EXISTS idx_share_resource ON share_links(resource_type, resource_id)`,
}
for _, m := range migrations {
if _, err := db.Exec(m); err != nil {
return fmt.Errorf("migration failed: %w\nSQL: %s", err, m)
}
}
common.Logger.Info("database migrations completed")
return nil
}

View File

@ -1,91 +0,0 @@
package repository
import (
"context"
"database/sql"
"fmt"
"rag/file-system/internal/domain/model"
)
type FileMetaRepoImpl struct {
db *sql.DB
}
func NewFileMetaRepository(db *sql.DB) *FileMetaRepoImpl {
return &FileMetaRepoImpl{db: db}
}
func (r *FileMetaRepoImpl) Create(ctx context.Context, file *model.FileMeta) error {
_, err := r.db.ExecContext(ctx,
`INSERT INTO files (id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
file.ID, file.FolderID, file.Name, file.S3Key, file.S3Bucket, file.Size, file.ContentType, file.OwnerID, file.CreatedAt, file.UpdatedAt)
if err != nil {
return fmt.Errorf("failed to create file meta: %w", err)
}
return nil
}
func (r *FileMetaRepoImpl) GetByID(ctx context.Context, id string) (*model.FileMeta, error) {
var f model.FileMeta
err := r.db.QueryRowContext(ctx,
`SELECT id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at FROM files WHERE id = $1`, id).
Scan(&f.ID, &f.FolderID, &f.Name, &f.S3Key, &f.S3Bucket, &f.Size, &f.ContentType, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get file meta: %w", err)
}
return &f, nil
}
func (r *FileMetaRepoImpl) GetByFolder(ctx context.Context, folderID string) ([]model.FileMeta, error) {
rows, err := r.db.QueryContext(ctx,
`SELECT id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at FROM files WHERE folder_id = $1 ORDER BY name`, folderID)
if err != nil {
return nil, fmt.Errorf("failed to get files by folder: %w", err)
}
defer rows.Close()
return scanFiles(rows)
}
func (r *FileMetaRepoImpl) Move(ctx context.Context, fileID string, targetFolderID string, ownerID string) error {
result, err := r.db.ExecContext(ctx,
`UPDATE files SET folder_id = $1, updated_at = now() WHERE id = $2 AND owner_id = $3`,
targetFolderID, fileID, ownerID)
if err != nil {
return fmt.Errorf("failed to move file: %w", err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("file not found or not owned by user")
}
return nil
}
func (r *FileMetaRepoImpl) Delete(ctx context.Context, id string, ownerID string) error {
result, err := r.db.ExecContext(ctx, `DELETE FROM files WHERE id = $1 AND owner_id = $2`, id, ownerID)
if err != nil {
return fmt.Errorf("failed to delete file meta: %w", err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("file not found or not owned by user")
}
return nil
}
func scanFiles(rows *sql.Rows) ([]model.FileMeta, error) {
var files []model.FileMeta
for rows.Next() {
var f model.FileMeta
if err := rows.Scan(&f.ID, &f.FolderID, &f.Name, &f.S3Key, &f.S3Bucket, &f.Size, &f.ContentType, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to scan file: %w", err)
}
files = append(files, f)
}
return files, rows.Err()
}

View File

@ -1,154 +0,0 @@
package repository
import (
"context"
"database/sql"
"fmt"
"rag/file-system/internal/domain/model"
)
type FolderRepoImpl struct {
db *sql.DB
}
func NewFolderRepository(db *sql.DB) *FolderRepoImpl {
return &FolderRepoImpl{db: db}
}
func (r *FolderRepoImpl) Create(ctx context.Context, folder *model.Folder) error {
_, err := r.db.ExecContext(ctx,
`INSERT INTO folders (id, parent_id, name, owner_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)`,
folder.ID, folder.ParentID, folder.Name, folder.OwnerID, folder.CreatedAt, folder.UpdatedAt)
if err != nil {
return fmt.Errorf("failed to create folder: %w", err)
}
return nil
}
func (r *FolderRepoImpl) GetByID(ctx context.Context, id string) (*model.Folder, error) {
var f model.Folder
var parentID sql.NullString
err := r.db.QueryRowContext(ctx,
`SELECT id, parent_id, name, owner_id, created_at, updated_at FROM folders WHERE id = $1`, id).
Scan(&f.ID, &parentID, &f.Name, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get folder: %w", err)
}
if parentID.Valid {
f.ParentID = &parentID.String
}
return &f, nil
}
func (r *FolderRepoImpl) GetWithChildren(ctx context.Context, id string, ownerID string) (*model.FolderWithChildren, error) {
folder, err := r.GetByID(ctx, id)
if err != nil {
return nil, err
}
if folder == nil || folder.OwnerID != ownerID {
return nil, nil
}
subFolders, err := r.queryFolders(ctx,
`SELECT id, parent_id, name, owner_id, created_at, updated_at FROM folders WHERE parent_id = $1 AND owner_id = $2 ORDER BY name`, id, ownerID)
if err != nil {
return nil, err
}
files, err := r.queryFiles(ctx,
`SELECT id, folder_id, name, s3_key, s3_bucket, size, content_type, owner_id, created_at, updated_at FROM files WHERE folder_id = $1 AND owner_id = $2 ORDER BY name`, id, ownerID)
if err != nil {
return nil, err
}
return &model.FolderWithChildren{
Folder: *folder,
SubFolders: subFolders,
Files: files,
}, nil
}
func (r *FolderRepoImpl) GetTree(ctx context.Context, ownerID string) ([]model.Folder, error) {
return r.queryFolders(ctx,
`SELECT id, parent_id, name, owner_id, created_at, updated_at FROM folders WHERE owner_id = $1 ORDER BY name`, ownerID)
}
func (r *FolderRepoImpl) Update(ctx context.Context, folder *model.Folder) error {
_, err := r.db.ExecContext(ctx,
`UPDATE folders SET name = $1, updated_at = $2 WHERE id = $3`,
folder.Name, folder.UpdatedAt, folder.ID)
if err != nil {
return fmt.Errorf("failed to update folder: %w", err)
}
return nil
}
func (r *FolderRepoImpl) Delete(ctx context.Context, id string, ownerID string) error {
result, err := r.db.ExecContext(ctx, `DELETE FROM folders WHERE id = $1 AND owner_id = $2`, id, ownerID)
if err != nil {
return fmt.Errorf("failed to delete folder: %w", err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("folder not found or not owned by user")
}
return nil
}
func (r *FolderRepoImpl) GetDescendantFileS3Keys(ctx context.Context, id string, ownerID string) ([]model.FileMeta, error) {
query := `
WITH RECURSIVE descendants AS (
SELECT id FROM folders WHERE id = $1 AND owner_id = $2
UNION
SELECT f.id FROM folders f INNER JOIN descendants d ON f.parent_id = d.id
)
SELECT fi.id, fi.folder_id, fi.name, fi.s3_key, fi.s3_bucket, fi.size, fi.content_type, fi.owner_id, fi.created_at, fi.updated_at
FROM files fi INNER JOIN descendants d ON fi.folder_id = d.id
WHERE fi.owner_id = $2`
return r.queryFiles(ctx, query, id, ownerID)
}
func (r *FolderRepoImpl) queryFolders(ctx context.Context, query string, args ...interface{}) ([]model.Folder, error) {
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query folders: %w", err)
}
defer rows.Close()
var folders []model.Folder
for rows.Next() {
var f model.Folder
var parentID sql.NullString
if err := rows.Scan(&f.ID, &parentID, &f.Name, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to scan folder: %w", err)
}
if parentID.Valid {
f.ParentID = &parentID.String
}
folders = append(folders, f)
}
return folders, rows.Err()
}
func (r *FolderRepoImpl) queryFiles(ctx context.Context, query string, args ...interface{}) ([]model.FileMeta, error) {
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query files: %w", err)
}
defer rows.Close()
var files []model.FileMeta
for rows.Next() {
var f model.FileMeta
if err := rows.Scan(&f.ID, &f.FolderID, &f.Name, &f.S3Key, &f.S3Bucket, &f.Size, &f.ContentType, &f.OwnerID, &f.CreatedAt, &f.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to scan file: %w", err)
}
files = append(files, f)
}
return files, rows.Err()
}

View File

@ -1,90 +0,0 @@
package repository
import (
"context"
"database/sql"
"fmt"
"rag/file-system/internal/domain/model"
)
type ShareRepoImpl struct {
db *sql.DB
}
func NewShareRepository(db *sql.DB) *ShareRepoImpl {
return &ShareRepoImpl{db: db}
}
func (r *ShareRepoImpl) Create(ctx context.Context, share *model.ShareLink) error {
_, err := r.db.ExecContext(ctx,
`INSERT INTO share_links (id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
share.ID, share.ResourceType, share.ResourceID, share.Token, share.Password, share.ExpiresAt,
share.DownloadCount, share.MaxDownloads, share.CreatedBy, share.CreatedAt)
if err != nil {
return fmt.Errorf("failed to create share link: %w", err)
}
return nil
}
func (r *ShareRepoImpl) GetByToken(ctx context.Context, token string) (*model.ShareLink, error) {
return r.queryOne(ctx, `SELECT id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at FROM share_links WHERE token = $1`, token)
}
func (r *ShareRepoImpl) GetByID(ctx context.Context, id string) (*model.ShareLink, error) {
return r.queryOne(ctx, `SELECT id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at FROM share_links WHERE id = $1`, id)
}
func (r *ShareRepoImpl) Delete(ctx context.Context, id string, createdBy string) error {
result, err := r.db.ExecContext(ctx, `DELETE FROM share_links WHERE id = $1 AND created_by = $2`, id, createdBy)
if err != nil {
return fmt.Errorf("failed to delete share link: %w", err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("share link not found or not owned by user")
}
return nil
}
func (r *ShareRepoImpl) IncrementDownloadCount(ctx context.Context, token string) error {
_, err := r.db.ExecContext(ctx, `UPDATE share_links SET download_count = download_count + 1 WHERE token = $1`, token)
if err != nil {
return fmt.Errorf("failed to increment download count: %w", err)
}
return nil
}
func (r *ShareRepoImpl) ListByResource(ctx context.Context, resourceType string, resourceID string) ([]model.ShareLink, error) {
rows, err := r.db.QueryContext(ctx,
`SELECT id, resource_type, resource_id, token, password, expires_at, download_count, max_downloads, created_by, created_at FROM share_links WHERE resource_type = $1 AND resource_id = $2 ORDER BY created_at DESC`,
resourceType, resourceID)
if err != nil {
return nil, fmt.Errorf("failed to list share links: %w", err)
}
defer rows.Close()
var links []model.ShareLink
for rows.Next() {
var s model.ShareLink
if err := rows.Scan(&s.ID, &s.ResourceType, &s.ResourceID, &s.Token, &s.Password, &s.ExpiresAt, &s.DownloadCount, &s.MaxDownloads, &s.CreatedBy, &s.CreatedAt); err != nil {
return nil, fmt.Errorf("failed to scan share link: %w", err)
}
links = append(links, s)
}
return links, rows.Err()
}
func (r *ShareRepoImpl) queryOne(ctx context.Context, query string, args ...interface{}) (*model.ShareLink, error) {
var s model.ShareLink
err := r.db.QueryRowContext(ctx, query, args...).
Scan(&s.ID, &s.ResourceType, &s.ResourceID, &s.Token, &s.Password, &s.ExpiresAt, &s.DownloadCount, &s.MaxDownloads, &s.CreatedBy, &s.CreatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to query share link: %w", err)
}
return &s, nil
}

View File

@ -1,56 +0,0 @@
package s3
import (
"context"
"rag/file-system/internal/common"
"log"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
type RustFSClient struct {
client *s3.Client
presignClient *s3.PresignClient
}
func NewRustFSClient(cfg *common.Config) *RustFSClient {
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: cfg.RustFSEndpoint,
SigningRegion: cfg.RustFSRegion,
}, nil
})
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(cfg.RustFSRegion),
config.WithEndpointResolverWithOptions(customResolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
cfg.RustFSAccessKeyID,
cfg.RustFSSecretAccessKey,
"",
)),
)
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
})
return &RustFSClient{
client: client,
presignClient: s3.NewPresignClient(client),
}
}
func (c *RustFSClient) S3Client() *s3.Client {
return c.client
}
func (c *RustFSClient) PresignClient() *s3.PresignClient {
return c.presignClient
}

View File

@ -1,225 +0,0 @@
package s3
import (
"context"
"rag/file-system/internal/common"
"rag/file-system/internal/domain/repository"
"io"
"sort"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
const maxContentPreviewSize = 10 * 1024 * 1024 // 10MB
type S3FileRepository struct {
client *RustFSClient
}
func NewS3FileRepository(client *RustFSClient) repository.FileRepository {
return &S3FileRepository{client: client}
}
func (r *S3FileRepository) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error {
_, err := r.client.S3Client().PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: data,
})
return common.WrapS3Error(err)
}
func (r *S3FileRepository) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) {
resp, err := r.client.S3Client().GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return nil, common.WrapS3Error(err)
}
return resp.Body, nil
}
func (r *S3FileRepository) ListBuckets(ctx context.Context) ([]string, error) {
resp, err := r.client.S3Client().ListBuckets(ctx, &s3.ListBucketsInput{})
if err != nil {
return nil, common.WrapS3Error(err)
}
var buckets []string
for _, b := range resp.Buckets {
if b.Name != nil {
buckets = append(buckets, *b.Name)
}
}
return buckets, nil
}
func (r *S3FileRepository) CreateBucket(ctx context.Context, bucketName string) error {
_, err := r.client.S3Client().CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
return common.WrapS3Error(err)
}
func (r *S3FileRepository) DeleteBucket(ctx context.Context, bucketName string) error {
_, err := r.client.S3Client().DeleteBucket(ctx, &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
return common.WrapS3Error(err)
}
// GetFileContent retrieves text file content for preview (e.g., Markdown files)
func (r *S3FileRepository) GetFileContent(ctx context.Context, bucketName string, objectKey string) (string, error) {
resp, err := r.client.S3Client().GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return "", common.WrapS3Error(err)
}
defer resp.Body.Close()
data, err := io.ReadAll(io.LimitReader(resp.Body, maxContentPreviewSize))
if err != nil {
return "", err
}
if int64(len(data)) >= maxContentPreviewSize {
return "", common.NewBusinessException("file too large for content preview (max 10MB)")
}
return string(data), nil
}
// DeleteFile removes a file from the bucket
func (r *S3FileRepository) DeleteFile(ctx context.Context, bucketName string, objectKey string) error {
_, err := r.client.S3Client().DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
return common.WrapS3Error(err)
}
// ListObjectsV2 lists files with pagination support
func (r *S3FileRepository) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*repository.ListFilesResult, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
MaxKeys: aws.Int32(maxKeys),
}
if continuationToken != nil && *continuationToken != "" {
input.ContinuationToken = continuationToken
}
resp, err := r.client.S3Client().ListObjectsV2(ctx, input)
if err != nil {
return nil, common.WrapS3Error(err)
}
files := make([]repository.FileInfo, 0, len(resp.Contents))
for _, obj := range resp.Contents {
if obj.Key == nil || obj.Size == nil || obj.LastModified == nil || obj.ETag == nil {
continue
}
files = append(files, repository.FileInfo{
Key: *obj.Key,
Size: *obj.Size,
LastModified: *obj.LastModified,
ETag: *obj.ETag,
})
}
return &repository.ListFilesResult{
Files: files,
NextContinuationToken: resp.NextContinuationToken,
}, nil
}
// GeneratePresignedURL generates a presigned URL for temporary file access
func (r *S3FileRepository) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) {
presignResult, err := r.client.PresignClient().PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
}, func(opts *s3.PresignOptions) {
opts.Expires = expiry
})
if err != nil {
return "", common.WrapS3Error(err)
}
return presignResult.URL, nil
}
// CreateMultipartUpload initializes a multipart upload session
func (r *S3FileRepository) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) {
resp, err := r.client.S3Client().CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return "", common.WrapS3Error(err)
}
if resp.UploadId == nil {
return "", common.NewBusinessException("failed to initialize multipart upload")
}
return *resp.UploadId, nil
}
// UploadPart uploads a single part of a multipart upload
func (r *S3FileRepository) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) {
resp, err := r.client.S3Client().UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
PartNumber: aws.Int32(partNumber),
Body: data,
})
if err != nil {
return "", common.WrapS3Error(err)
}
if resp.ETag == nil {
return "", common.NewBusinessException("failed to upload part")
}
return *resp.ETag, nil
}
// CompleteMultipartUpload assembles all parts to complete the upload
func (r *S3FileRepository) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []common.Part) (string, error) {
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
completedParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
completedParts[i] = types.CompletedPart{
ETag: aws.String(p.ETag),
PartNumber: aws.Int32(p.PartNumber),
}
}
resp, err := r.client.S3Client().CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
return "", common.WrapS3Error(err)
}
if resp.Location == nil {
return "", common.NewBusinessException("failed to complete multipart upload")
}
return *resp.Location, nil
}
// AbortMultipartUpload cancels an in-progress multipart upload
func (r *S3FileRepository) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error {
_, err := r.client.S3Client().AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
})
return common.WrapS3Error(err)
}