package s3 import ( "context" "file-system/internal/common" "file-system/internal/domain/repository" "io" "sort" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" ) type S3FileRepository struct { client *RustFSClient } func NewS3FileRepository(client *RustFSClient) repository.FileRepository { return &S3FileRepository{client: client} } func (r *S3FileRepository) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error { _, err := r.client.Client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), Body: data, }) return err } func (r *S3FileRepository) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) { resp, err := r.client.Client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), }) if err != nil { return nil, err } return resp.Body, nil } func (r *S3FileRepository) ListBuckets(ctx context.Context) ([]string, error) { resp, err := r.client.Client.ListBuckets(ctx, &s3.ListBucketsInput{}) if err != nil { return nil, err } var buckets []string for _, b := range resp.Buckets { if b.Name != nil { buckets = append(buckets, *b.Name) } } return buckets, nil } func (r *S3FileRepository) CreateBucket(ctx context.Context, bucketName string) error { _, err := r.client.Client.CreateBucket(ctx, &s3.CreateBucketInput{ Bucket: aws.String(bucketName), }) return err } func (r *S3FileRepository) DeleteBucket(ctx context.Context, bucketName string) error { _, err := r.client.Client.DeleteBucket(ctx, &s3.DeleteBucketInput{ Bucket: aws.String(bucketName), }) return err } func (r *S3FileRepository) ListObjects(ctx context.Context, bucketName string) ([]string, error) { resp, err := r.client.Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ Bucket: aws.String(bucketName), }) if err != nil { return nil, err } var objects []string for _, obj := range resp.Contents { if obj.Key != nil { objects = append(objects, *obj.Key) } } return objects, nil } // ListObjectsV2 分页列出文件 func (r *S3FileRepository) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*repository.ListFilesResult, error) { input := &s3.ListObjectsV2Input{ Bucket: aws.String(bucketName), Prefix: aws.String(prefix), MaxKeys: aws.Int32(maxKeys), } if continuationToken != nil && *continuationToken != "" { input.ContinuationToken = continuationToken } resp, err := r.client.Client.ListObjectsV2(ctx, input) if err != nil { return nil, err } files := make([]repository.FileInfo, 0, len(resp.Contents)) for _, obj := range resp.Contents { files = append(files, repository.FileInfo{ Key: *obj.Key, Size: *obj.Size, LastModified: *obj.LastModified, ETag: *obj.ETag, }) } return &repository.ListFilesResult{ Files: files, NextContinuationToken: resp.NextContinuationToken, }, nil } // GeneratePresignedURL 生成预签名链接 func (r *S3FileRepository) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) { presignResult, err := r.client.PresignClient.PresignGetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), }, func(opts *s3.PresignOptions) { opts.Expires = expiry }) if err != nil { return "", err } return presignResult.URL, nil } // CreateMultipartUpload 初始化分片上传 func (r *S3FileRepository) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) { resp, err := r.client.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), }) if err != nil { return "", err } return *resp.UploadId, nil } // UploadPart 上传分片 func (r *S3FileRepository) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) { resp, err := r.client.Client.UploadPart(ctx, &s3.UploadPartInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), UploadId: aws.String(uploadId), PartNumber: aws.Int32(partNumber), Body: data, }) if err != nil { return "", err } return *resp.ETag, nil } // CompleteMultipartUpload 完成分片上传 func (r *S3FileRepository) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []common.Part) (string, error) { // 需要按 PartNumber 排序 sort.Slice(parts, func(i, j int) bool { return parts[i].PartNumber < parts[j].PartNumber }) completedParts := make([]types.CompletedPart, len(parts)) for i, p := range parts { completedParts[i] = types.CompletedPart{ ETag: aws.String(p.ETag), PartNumber: aws.Int32(p.PartNumber), } } resp, err := r.client.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), UploadId: aws.String(uploadId), MultipartUpload: &types.CompletedMultipartUpload{ Parts: completedParts, }, }) if err != nil { return "", err } return *resp.Location, nil } // AbortMultipartUpload 取消分片上传 func (r *S3FileRepository) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error { _, err := r.client.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), UploadId: aws.String(uploadId), }) return err }