Replace old infrastructure layer with Kratos-style data layer: - data.go: GORM connection, transaction support, Wire ProviderSet, PO models - file_repo.go: All 12 S3 operations (upload, download, multipart, presign, buckets) - folder_repo.go: GORM queries including recursive CTE for descendant files - file_meta_repo.go: CRUD + move operations for file metadata - share_repo.go: CRUD + increment download count for share links Deleted old infrastructure/database, infrastructure/repository, infrastructure/s3. Kept infrastructure/grpc for later integration.
286 lines
8.4 KiB
Go
286 lines
8.4 KiB
Go
package data
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"time"
|
|
|
|
"rag/file-system/internal/conf"
|
|
"rag/file-system/internal/pkg/s3errors"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
)
|
|
|
|
const maxContentPreviewSize = 10 * 1024 * 1024 // 10MB
|
|
|
|
// Part represents a single uploaded part in a multipart upload.
|
|
type Part struct {
|
|
ETag string
|
|
PartNumber int32
|
|
}
|
|
|
|
// FileInfo holds metadata about an S3 object.
|
|
type FileInfo struct {
|
|
Key string
|
|
Size int64
|
|
LastModified time.Time
|
|
ETag string
|
|
}
|
|
|
|
// ListFilesResult holds the result of a paginated list operation.
|
|
type ListFilesResult struct {
|
|
Files []FileInfo
|
|
NextContinuationToken *string
|
|
}
|
|
|
|
// FileRepo handles all S3 storage operations.
|
|
type FileRepo struct {
|
|
client *s3.Client
|
|
presignClient *s3.PresignClient
|
|
}
|
|
|
|
// NewFileRepo creates a new FileRepo with an S3 client configured from the provided config.
|
|
func NewFileRepo(c *conf.Data) *FileRepo {
|
|
s3Conf := c.GetS3()
|
|
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
|
return aws.Endpoint{
|
|
URL: s3Conf.GetEndpoint(),
|
|
SigningRegion: s3Conf.GetRegion(),
|
|
}, nil
|
|
})
|
|
|
|
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
|
|
config.WithRegion(s3Conf.GetRegion()),
|
|
config.WithEndpointResolverWithOptions(customResolver),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
s3Conf.GetAccessKey(),
|
|
s3Conf.GetSecretKey(),
|
|
"",
|
|
)),
|
|
)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("unable to load S3 SDK config: %v", err))
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
|
o.UsePathStyle = true
|
|
})
|
|
|
|
return &FileRepo{
|
|
client: client,
|
|
presignClient: s3.NewPresignClient(client),
|
|
}
|
|
}
|
|
|
|
// UploadFile uploads data to the specified bucket and object key.
|
|
func (r *FileRepo) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error {
|
|
_, err := r.client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
Body: data,
|
|
})
|
|
return s3errors.Wrap(err)
|
|
}
|
|
|
|
// DownloadFile downloads an object from S3 and returns its body as a ReadCloser.
|
|
func (r *FileRepo) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) {
|
|
resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
if err != nil {
|
|
return nil, s3errors.Wrap(err)
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// ListBuckets returns all bucket names.
|
|
func (r *FileRepo) ListBuckets(ctx context.Context) ([]string, error) {
|
|
resp, err := r.client.ListBuckets(ctx, &s3.ListBucketsInput{})
|
|
if err != nil {
|
|
return nil, s3errors.Wrap(err)
|
|
}
|
|
var buckets []string
|
|
for _, b := range resp.Buckets {
|
|
if b.Name != nil {
|
|
buckets = append(buckets, *b.Name)
|
|
}
|
|
}
|
|
return buckets, nil
|
|
}
|
|
|
|
// CreateBucket creates a new S3 bucket.
|
|
func (r *FileRepo) CreateBucket(ctx context.Context, bucketName string) error {
|
|
_, err := r.client.CreateBucket(ctx, &s3.CreateBucketInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
return s3errors.Wrap(err)
|
|
}
|
|
|
|
// DeleteBucket deletes an S3 bucket.
|
|
func (r *FileRepo) DeleteBucket(ctx context.Context, bucketName string) error {
|
|
_, err := r.client.DeleteBucket(ctx, &s3.DeleteBucketInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
return s3errors.Wrap(err)
|
|
}
|
|
|
|
// GetFileContent retrieves text file content for preview (e.g., Markdown files).
|
|
func (r *FileRepo) GetFileContent(ctx context.Context, bucketName string, objectKey string) (string, error) {
|
|
resp, err := r.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
if err != nil {
|
|
return "", s3errors.Wrap(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
data, err := io.ReadAll(io.LimitReader(resp.Body, maxContentPreviewSize))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if int64(len(data)) >= maxContentPreviewSize {
|
|
return "", fmt.Errorf("file too large for content preview (max 10MB)")
|
|
}
|
|
return string(data), nil
|
|
}
|
|
|
|
// DeleteFile removes a file from the bucket.
|
|
func (r *FileRepo) DeleteFile(ctx context.Context, bucketName string, objectKey string) error {
|
|
_, err := r.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
return s3errors.Wrap(err)
|
|
}
|
|
|
|
// ListObjectsV2 lists files with pagination support.
|
|
func (r *FileRepo) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*ListFilesResult, error) {
|
|
input := &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(bucketName),
|
|
Prefix: aws.String(prefix),
|
|
MaxKeys: aws.Int32(maxKeys),
|
|
}
|
|
if continuationToken != nil && *continuationToken != "" {
|
|
input.ContinuationToken = continuationToken
|
|
}
|
|
|
|
resp, err := r.client.ListObjectsV2(ctx, input)
|
|
if err != nil {
|
|
return nil, s3errors.Wrap(err)
|
|
}
|
|
|
|
files := make([]FileInfo, 0, len(resp.Contents))
|
|
for _, obj := range resp.Contents {
|
|
if obj.Key == nil || obj.Size == nil || obj.LastModified == nil || obj.ETag == nil {
|
|
continue
|
|
}
|
|
files = append(files, FileInfo{
|
|
Key: *obj.Key,
|
|
Size: *obj.Size,
|
|
LastModified: *obj.LastModified,
|
|
ETag: *obj.ETag,
|
|
})
|
|
}
|
|
|
|
return &ListFilesResult{
|
|
Files: files,
|
|
NextContinuationToken: resp.NextContinuationToken,
|
|
}, nil
|
|
}
|
|
|
|
// GeneratePresignedURL generates a presigned URL for temporary file access.
|
|
func (r *FileRepo) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) {
|
|
presignResult, err := r.presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
}, func(opts *s3.PresignOptions) {
|
|
opts.Expires = expiry
|
|
})
|
|
if err != nil {
|
|
return "", s3errors.Wrap(err)
|
|
}
|
|
return presignResult.URL, nil
|
|
}
|
|
|
|
// CreateMultipartUpload initializes a multipart upload session.
|
|
func (r *FileRepo) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) {
|
|
resp, err := r.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
if err != nil {
|
|
return "", s3errors.Wrap(err)
|
|
}
|
|
if resp.UploadId == nil {
|
|
return "", fmt.Errorf("failed to initialize multipart upload")
|
|
}
|
|
return *resp.UploadId, nil
|
|
}
|
|
|
|
// UploadPart uploads a single part of a multipart upload.
|
|
func (r *FileRepo) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) {
|
|
resp, err := r.client.UploadPart(ctx, &s3.UploadPartInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
UploadId: aws.String(uploadId),
|
|
PartNumber: aws.Int32(partNumber),
|
|
Body: data,
|
|
})
|
|
if err != nil {
|
|
return "", s3errors.Wrap(err)
|
|
}
|
|
if resp.ETag == nil {
|
|
return "", fmt.Errorf("failed to upload part")
|
|
}
|
|
return *resp.ETag, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload assembles all parts to complete the upload.
|
|
func (r *FileRepo) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []Part) (string, error) {
|
|
sort.Slice(parts, func(i, j int) bool {
|
|
return parts[i].PartNumber < parts[j].PartNumber
|
|
})
|
|
|
|
completedParts := make([]types.CompletedPart, len(parts))
|
|
for i, p := range parts {
|
|
completedParts[i] = types.CompletedPart{
|
|
ETag: aws.String(p.ETag),
|
|
PartNumber: aws.Int32(p.PartNumber),
|
|
}
|
|
}
|
|
|
|
resp, err := r.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
UploadId: aws.String(uploadId),
|
|
MultipartUpload: &types.CompletedMultipartUpload{
|
|
Parts: completedParts,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return "", s3errors.Wrap(err)
|
|
}
|
|
if resp.Location == nil {
|
|
return "", fmt.Errorf("failed to complete multipart upload")
|
|
}
|
|
return *resp.Location, nil
|
|
}
|
|
|
|
// AbortMultipartUpload cancels an in-progress multipart upload.
|
|
func (r *FileRepo) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error {
|
|
_, err := r.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
UploadId: aws.String(uploadId),
|
|
})
|
|
return s3errors.Wrap(err)
|
|
}
|