199 lines
5.7 KiB
Go
199 lines
5.7 KiB
Go
package s3
|
|
|
|
import (
|
|
"context"
|
|
"file-system/internal/common"
|
|
"file-system/internal/domain/repository"
|
|
"io"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
)
|
|
|
|
type S3FileRepository struct {
|
|
client *RustFSClient
|
|
}
|
|
|
|
func NewS3FileRepository(client *RustFSClient) repository.FileRepository {
|
|
return &S3FileRepository{client: client}
|
|
}
|
|
|
|
func (r *S3FileRepository) UploadFile(ctx context.Context, bucketName string, objectKey string, data io.Reader) error {
|
|
_, err := r.client.Client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
Body: data,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (r *S3FileRepository) DownloadFile(ctx context.Context, bucketName string, objectKey string) (io.ReadCloser, error) {
|
|
resp, err := r.client.Client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
|
|
func (r *S3FileRepository) ListBuckets(ctx context.Context) ([]string, error) {
|
|
resp, err := r.client.Client.ListBuckets(ctx, &s3.ListBucketsInput{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var buckets []string
|
|
for _, b := range resp.Buckets {
|
|
if b.Name != nil {
|
|
buckets = append(buckets, *b.Name)
|
|
}
|
|
}
|
|
return buckets, nil
|
|
}
|
|
|
|
func (r *S3FileRepository) CreateBucket(ctx context.Context, bucketName string) error {
|
|
_, err := r.client.Client.CreateBucket(ctx, &s3.CreateBucketInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (r *S3FileRepository) DeleteBucket(ctx context.Context, bucketName string) error {
|
|
_, err := r.client.Client.DeleteBucket(ctx, &s3.DeleteBucketInput{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (r *S3FileRepository) ListObjects(ctx context.Context, bucketName string) ([]string, error) {
|
|
resp, err := r.client.Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(bucketName),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var objects []string
|
|
for _, obj := range resp.Contents {
|
|
if obj.Key != nil {
|
|
objects = append(objects, *obj.Key)
|
|
}
|
|
}
|
|
return objects, nil
|
|
}
|
|
|
|
// ListObjectsV2 分页列出文件
|
|
func (r *S3FileRepository) ListObjectsV2(ctx context.Context, bucketName string, prefix string, maxKeys int32, continuationToken *string) (*repository.ListFilesResult, error) {
|
|
input := &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(bucketName),
|
|
Prefix: aws.String(prefix),
|
|
MaxKeys: aws.Int32(maxKeys),
|
|
}
|
|
if continuationToken != nil && *continuationToken != "" {
|
|
input.ContinuationToken = continuationToken
|
|
}
|
|
|
|
resp, err := r.client.Client.ListObjectsV2(ctx, input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
files := make([]repository.FileInfo, 0, len(resp.Contents))
|
|
for _, obj := range resp.Contents {
|
|
files = append(files, repository.FileInfo{
|
|
Key: *obj.Key,
|
|
Size: *obj.Size,
|
|
LastModified: *obj.LastModified,
|
|
ETag: *obj.ETag,
|
|
})
|
|
}
|
|
|
|
return &repository.ListFilesResult{
|
|
Files: files,
|
|
NextContinuationToken: resp.NextContinuationToken,
|
|
}, nil
|
|
}
|
|
|
|
// GeneratePresignedURL 生成预签名链接
|
|
func (r *S3FileRepository) GeneratePresignedURL(ctx context.Context, bucketName string, objectKey string, expiry time.Duration) (string, error) {
|
|
presignResult, err := r.client.PresignClient.PresignGetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
}, func(opts *s3.PresignOptions) {
|
|
opts.Expires = expiry
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return presignResult.URL, nil
|
|
}
|
|
|
|
// CreateMultipartUpload 初始化分片上传
|
|
func (r *S3FileRepository) CreateMultipartUpload(ctx context.Context, bucketName string, objectKey string) (string, error) {
|
|
resp, err := r.client.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return *resp.UploadId, nil
|
|
}
|
|
|
|
// UploadPart 上传分片
|
|
func (r *S3FileRepository) UploadPart(ctx context.Context, bucketName string, objectKey string, uploadId string, partNumber int32, data io.Reader) (string, error) {
|
|
resp, err := r.client.Client.UploadPart(ctx, &s3.UploadPartInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
UploadId: aws.String(uploadId),
|
|
PartNumber: aws.Int32(partNumber),
|
|
Body: data,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return *resp.ETag, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload 完成分片上传
|
|
func (r *S3FileRepository) CompleteMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string, parts []common.Part) (string, error) {
|
|
// 需要按 PartNumber 排序
|
|
sort.Slice(parts, func(i, j int) bool {
|
|
return parts[i].PartNumber < parts[j].PartNumber
|
|
})
|
|
|
|
completedParts := make([]types.CompletedPart, len(parts))
|
|
for i, p := range parts {
|
|
completedParts[i] = types.CompletedPart{
|
|
ETag: aws.String(p.ETag),
|
|
PartNumber: aws.Int32(p.PartNumber),
|
|
}
|
|
}
|
|
|
|
resp, err := r.client.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
UploadId: aws.String(uploadId),
|
|
MultipartUpload: &types.CompletedMultipartUpload{
|
|
Parts: completedParts,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return *resp.Location, nil
|
|
}
|
|
|
|
// AbortMultipartUpload 取消分片上传
|
|
func (r *S3FileRepository) AbortMultipartUpload(ctx context.Context, bucketName string, objectKey string, uploadId string) error {
|
|
_, err := r.client.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
|
|
Bucket: aws.String(bucketName),
|
|
Key: aws.String(objectKey),
|
|
UploadId: aws.String(uploadId),
|
|
})
|
|
return err
|
|
}
|