file-system/internal/data/file_repo.go
向宁 bcd637387a feat: add data layer with GORM models, S3 repo, PG repos
Replace old infrastructure layer with Kratos-style data layer:
- data.go: GORM connection, transaction support, Wire ProviderSet, PO models
- file_repo.go: All 12 S3 operations (upload, download, multipart, presign, buckets)
- folder_repo.go: GORM queries including recursive CTE for descendant files
- file_meta_repo.go: CRUD + move operations for file metadata
- share_repo.go: CRUD + increment download count for share links

Deleted old infrastructure/database, infrastructure/repository, infrastructure/s3.
Kept infrastructure/grpc for later integration.
2026-05-25 12:57:42 +08:00

286 lines
8.4 KiB
Go

package data
import (
"context"
"fmt"
"io"
"sort"
"time"
"rag/file-system/internal/conf"
"rag/file-system/internal/pkg/s3errors"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
const maxContentPreviewSize = 10 * 1024 * 1024 // 10MB
// Part represents a single uploaded part in a multipart upload.
type Part struct {
ETag string
PartNumber int32
}
// FileInfo holds metadata about an S3 object.
type FileInfo struct {
Key string
Size int64
LastModified time.Time
ETag string
}
// ListFilesResult holds the result of a paginated list operation.
type ListFilesResult struct {
Files []FileInfo
NextContinuationToken *string
}
// FileRepo handles all S3 storage operations.
type FileRepo struct {
client *s3.Client
presignClient *s3.PresignClient
}
// NewFileRepo creates a new FileRepo with an S3 client configured from the provided config.
func NewFileRepo(c *conf.Data) *FileRepo {
s3Conf := c.GetS3()
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: s3Conf.GetEndpoint(),
SigningRegion: s3Conf.GetRegion(),
}, nil
})
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(s3Conf.GetRegion()),
config.WithEndpointResolverWithOptions(customResolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
s3Conf.GetAccessKey(),
s3Conf.GetSecretKey(),
"",
)),
)
if err != nil {
panic(fmt.Sprintf("unable to load S3 SDK config: %v", err))
}
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
})
return &FileRepo{
client: client,
presignClient: s3.NewPresignClient(client),
}
}
// UploadFile uploads data to the specified bucket and object key.
func (r *FileRepo) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error {
_, err := r.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: data,
})
return s3errors.Wrap(err)
}
// DownloadFile downloads an object from S3 and returns its body as a ReadCloser.
func (r *FileRepo) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) {
resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return nil, s3errors.Wrap(err)
}
return resp.Body, nil
}
// ListBuckets returns all bucket names.
func (r *FileRepo) ListBuckets(ctx context.Context) ([]string, error) {
resp, err := r.client.ListBuckets(ctx, &s3.ListBucketsInput{})
if err != nil {
return nil, s3errors.Wrap(err)
}
var buckets []string
for _, b := range resp.Buckets {
if b.Name != nil {
buckets = append(buckets, *b.Name)
}
}
return buckets, nil
}
// CreateBucket creates a new S3 bucket.
func (r *FileRepo) CreateBucket(ctx context.Context, bucketName string) error {
_, err := r.client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
return s3errors.Wrap(err)
}
// DeleteBucket deletes an S3 bucket.
func (r *FileRepo) DeleteBucket(ctx context.Context, bucketName string) error {
_, err := r.client.DeleteBucket(ctx, &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
return s3errors.Wrap(err)
}
// GetFileContent retrieves text file content for preview (e.g., Markdown files).
func (r *FileRepo) GetFileContent(ctx context.Context, bucketName string, objectKey string) (string, error) {
resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return "", s3errors.Wrap(err)
}
defer resp.Body.Close()
data, err := io.ReadAll(io.LimitReader(resp.Body, maxContentPreviewSize))
if err != nil {
return "", err
}
if int64(len(data)) >= maxContentPreviewSize {
return "", fmt.Errorf("file too large for content preview (max 10MB)")
}
return string(data), nil
}
// DeleteFile removes a file from the bucket.
func (r *FileRepo) DeleteFile(ctx context.Context, bucketName string, objectKey string) error {
_, err := r.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
return s3errors.Wrap(err)
}
// ListObjectsV2 lists files with pagination support.
func (r *FileRepo) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*ListFilesResult, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
MaxKeys: aws.Int32(maxKeys),
}
if continuationToken != nil && *continuationToken != "" {
input.ContinuationToken = continuationToken
}
resp, err := r.client.ListObjectsV2(ctx, input)
if err != nil {
return nil, s3errors.Wrap(err)
}
files := make([]FileInfo, 0, len(resp.Contents))
for _, obj := range resp.Contents {
if obj.Key == nil || obj.Size == nil || obj.LastModified == nil || obj.ETag == nil {
continue
}
files = append(files, FileInfo{
Key: *obj.Key,
Size: *obj.Size,
LastModified: *obj.LastModified,
ETag: *obj.ETag,
})
}
return &ListFilesResult{
Files: files,
NextContinuationToken: resp.NextContinuationToken,
}, nil
}
// GeneratePresignedURL generates a presigned URL for temporary file access.
func (r *FileRepo) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) {
presignResult, err := r.presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
}, func(opts *s3.PresignOptions) {
opts.Expires = expiry
})
if err != nil {
return "", s3errors.Wrap(err)
}
return presignResult.URL, nil
}
// CreateMultipartUpload initializes a multipart upload session.
func (r *FileRepo) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) {
resp, err := r.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return "", s3errors.Wrap(err)
}
if resp.UploadId == nil {
return "", fmt.Errorf("failed to initialize multipart upload")
}
return *resp.UploadId, nil
}
// UploadPart uploads a single part of a multipart upload.
func (r *FileRepo) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) {
resp, err := r.client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
PartNumber: aws.Int32(partNumber),
Body: data,
})
if err != nil {
return "", s3errors.Wrap(err)
}
if resp.ETag == nil {
return "", fmt.Errorf("failed to upload part")
}
return *resp.ETag, nil
}
// CompleteMultipartUpload assembles all parts to complete the upload.
func (r *FileRepo) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []Part) (string, error) {
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
completedParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
completedParts[i] = types.CompletedPart{
ETag: aws.String(p.ETag),
PartNumber: aws.Int32(p.PartNumber),
}
}
resp, err := r.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
return "", s3errors.Wrap(err)
}
if resp.Location == nil {
return "", fmt.Errorf("failed to complete multipart upload")
}
return *resp.Location, nil
}
// AbortMultipartUpload cancels an in-progress multipart upload.
func (r *FileRepo) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error {
_, err := r.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: aws.String(uploadId),
})
return s3errors.Wrap(err)
}