file-system/docs/superpowers/plans/2026-05-25-kratos-watermill-migration.md

78 KiB
Raw Blame History

file-system Kratos + Watermill CQRS 迁移计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 将 file-system 从 Gin + raw SQL + 自定义 Mediator 迁移到 Kratos + Watermill CQRS + GORM使其成为 rag-backend 的 gRPC 存储抽象层。

Architecture: Kratos 提供 HTTP+gRPC 双协议服务Wire 编译期 DI。Watermill CQRSCommandBus + EventBus替换自定义 Mediator消息存储在 PGSQL。GORM 替代 raw SQL 管理业务数据。S3 操作保留 AWS SDK v2 不变。file-system 只对接 rag-backend去掉 API Key 认证,只保留 JWT。

Tech Stack: Go 1.25, Kratos (HTTP+gRPC), Wire (DI), Watermill (CQRS), GORM (ORM), AWS SDK v2 (S3), PostgreSQL, OpenTelemetry


目标项目结构

file-system/
├── api/file/v1/                           # Protobuf 定义 + 生成代码
│   ├── file.proto                         # 文件服务 protoHTTP+gRPC 注解)
│   ├── file.pb.go                         # 生成:消息
│   ├── file_grpc.pb.go                    # 生成gRPC stub
│   ├── file_http.pb.go                    # 生成HTTP 路由
│   └── error_reason.pb.go                 # 生成:错误枚举
├── api/auth/v1/                           # 认证 proto保留供 gRPC client 用)
│   └── auth.proto
├── cmd/server/
│   ├── main.go                            # 入口,加载配置、启动 Kratos App
│   ├── wire.go                            # Wire 注入声明
│   └── wire_gen.go                        # Wire 生成代码
├── configs/
│   └── config.yaml                        # 本地开发配置样例
├── internal/
│   ├── conf/
│   │   ├── conf.proto                     # 配置结构 proto 定义
│   │   └── conf.pb.go                     # 生成:配置结构体
│   ├── biz/                               # 业务逻辑层DDD domain
│   │   ├── biz.go                         # ProviderSet + Transaction 接口
│   │   ├── file.go                        # 文件 usecase编排 repo + event
│   │   ├── bucket.go                      # 桶 usecase
│   │   ├── folder.go                      # 文件夹 usecase
│   │   └── share.go                       # 分享 usecase
│   ├── data/                              # 数据访问层(实现 biz repo 接口)
│   │   ├── data.go                        # Data 结构GORM DB、事务管理
│   │   ├── file_repo.go                   # S3 file repo保留 AWS SDK v2
│   │   ├── folder_repo.go                 # PG folder repoGORM
│   │   ├── file_meta_repo.go              # PG file meta repoGORM
│   │   └── share_repo.go                  # PG share repoGORM
│   ├── service/                           # 服务实现层DDD application
│   │   ├── service.go                     # ProviderSet
│   │   └── file.go                        # 实现 proto Service 接口
│   ├── server/                            # HTTP/gRPC server 创建
│   │   ├── server.go                      # ProviderSet
│   │   ├── http.go                        # NewHTTPServer
│   │   └── grpc.go                        # NewGRPCServer
│   ├── watermark/                         # Watermill CQRS 设置
│   │   ├── cqrs_setup.go                  # CommandBus + EventBus + Processors
│   │   ├── commands.go                    # Command 结构体定义
│   │   ├── events.go                      # Event 结构体定义
│   │   └── handlers.go                    # Command/Event handler 实现
│   └── pkg/                               # 内部共享工具
│       ├── sanitize/                      # 输入净化(保留)
│       │   └── sanitize.go
│       └── s3errors/                      # S3 错误映射(保留)
│           └── s3errors.go
├── third_party/                           # 第三方 protogoogle/api
│   └── google/api/
│       ├── annotations.proto
│       └── http.proto
├── Makefile
├── Dockerfile
├── docker-compose.yml
└── go.mod

迁移映射表(旧 → 新)

旧文件 新文件 说明
internal/api/handlers/*_commands.go internal/watermark/commands.go Command 结构体合并到一个文件
internal/api/handlers/*_queries.go internal/biz/*.go Query 逻辑内联到 usecase
internal/api/handlers/*_handlers.go internal/watermark/handlers.go + internal/biz/*.go 拆分CQRS handler → Watermill业务逻辑 → biz
internal/api/endpoints/*.go internal/service/file.go Gin endpoint → Kratos service
internal/api/requests/*.go api/file/v1/file.proto Request DTO → proto message
internal/api/validators/*.go proto validator + internal/service/file.go 合并到 proto 验证和 service 层
internal/infrastructure/mediator/ internal/watermark/ 自定义 Mediator → Watermill CQRS
internal/infrastructure/database/postgres.go internal/data/data.go raw SQL → GORM
internal/infrastructure/repository/*.go internal/data/*.go raw SQL repo → GORM repo
internal/infrastructure/s3/ internal/data/file_repo.go S3 操作保留,移到 data 层
internal/infrastructure/grpc/auth_client.go internal/server/grpc.go 或 middleware gRPC client 保留用于 JWT 验证
internal/middleware/*.go internal/server/http.go + internal/server/grpc.go Gin middleware → Kratos middleware
internal/common/config.go internal/conf/conf.proto + configs/config.yaml 环境变量 → Kratos config
internal/common/errors.go api/file/v1/error_reason.proto BusinessException → proto error enum
internal/common/sanitize.go internal/pkg/sanitize/sanitize.go 保留,移到 pkg
internal/common/s3_errors.go internal/pkg/s3errors/s3errors.go 保留,移到 pkg
internal/domain/model/*.go internal/biz/*.go(内联) Domain model → biz 层实体
internal/domain/repository/*.go internal/biz/*.go(接口定义) Repo 接口 → biz 层定义
cmd/server/main.go (277行) cmd/server/main.go (~30行) + Wire 手动 DI → Wire 自动注入

Task 1: 初始化 Kratos 项目骨架

Files:

  • Create: third_party/google/api/annotations.proto

  • Create: third_party/google/api/http.proto

  • Create: internal/conf/conf.proto

  • Create: configs/config.yaml

  • Create: Makefile

  • Delete: buf.yaml, buf.gen.yaml

  • Delete: docs/ (swagger docs)

  • Delete: internal/api/ (整个旧 API 层)

  • Delete: internal/infrastructure/mediator/

  • Delete: internal/middleware/

  • Delete: internal/common/

  • Step 1: 创建 Makefile

.PHONY: api config errors wire build run test clean

# 生成 proto API 代码HTTP + gRPC
api:
	buf generate

# 生成配置结构体
config:
	buf generate --path internal/conf/conf.proto --template buf.gen.config.yaml

# 生成错误枚举
errors:
	buf generate --path api/file/v1/error_reason.proto

# Wire 依赖注入
wire:
	cd cmd/server && wire

# 构建
build:
	go build -o ./bin/file-system ./cmd/server

# 运行
run:
	go run ./cmd/server

# 测试
test:
	go test ./...

# 清理
clean:
	rm -rf bin/
  • Step 2: 创建 buf.yaml项目根目录
version: v2
modules:
  - path: api
  - path: internal/conf
  - path: third_party
lint:
  use:
    - DEFAULT
breaking:
  use:
    - FILE
  • Step 3: 创建 buf.gen.yaml
version: v2
managed:
  enabled: true
  override:
    - file_option: go_package_prefix
      value: rag/file-system
plugins:
  - remote: buf.build/protocolbuffers/go
    out: .
    opt: paths=source_relative
  - remote: buf.build/grpc/go
    out: .
    opt: paths=source_relative
  - remote: buf.build/grpc-ecosystem/gateway
    out: .
    opt: paths=source_relative
  - remote: buf.build/go-kit/kratos
    out: .
    opt: paths=source_relative
  • Step 4: 创建 third_party/google/api/annotations.proto

https://github.com/googleapis/googleapis/raw/master/google/api/annotations.proto 下载,内容为标准 google.api.http 注解。

  • Step 5: 创建 third_party/google/api/http.proto

https://github.com/googleapis/googleapis/raw/master/google/api/http.proto 下载。

  • Step 6: 创建配置 proto — internal/conf/conf.proto
syntax = "proto3";

package conf;

option go_package = "rag/file-system/internal/conf";

import "google/protobuf/duration.proto";

message Bootstrap {
  Server server = 1;
  Data data = 2;
  Auth auth = 3;
}

message Server {
  message HTTP {
    string addr = 1;
    google.protobuf.Duration timeout = 2;
  }
  message GRPC {
    string addr = 1;
    google.protobuf.Duration timeout = 2;
  }
  HTTP http = 1;
  GRPC grpc = 2;
}

message Data {
  message Database {
    string driver = 1;
    string source = 2;
  }
  message S3 {
    string endpoint = 1;
    string access_key = 2;
    string secret_key = 3;
    string region = 4;
  }
  Database database = 1;
  S3 s3 = 2;
}

message Auth {
  string jwt_key = 1;
  string grpc_addr = 2;
}
  • Step 7: 创建本地配置样例 — configs/config.yaml
server:
  http:
    addr: 0.0.0.0:8080
    timeout: 30s
  grpc:
    addr: 0.0.0.0:9000
    timeout: 30s

data:
  database:
    driver: postgres
    source: "postgres://postgres:postgres@localhost:5432/file_system?sslmode=disable"
  s3:
    endpoint: "http://192.168.1.154:9000"
    access_key: "${RUSTFS_ACCESS_KEY_ID}"
    secret_key: "${RUSTFS_SECRET_ACCESS_KEY}"
    region: "us-east-1"

auth:
  jwt_key: "RagJwtSecretKey2026MustBeAtLeast32CharsLong!"
  grpc_addr: "rag-backend:50051"
  • Step 8: 下载 third_party proto 文件
mkdir -p third_party/google/api
curl -sL https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/annotations.proto -o third_party/google/api/annotations.proto
curl -sL https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/http.proto -o third_party/google/api/http.proto
  • Step 9: 生成配置结构体
cd /Users/wen/project/rag/file-system && buf generate --path internal/conf/conf.proto

Expected: internal/conf/conf.pb.go 生成成功。

  • Step 10: 删除旧文件
rm -rf internal/api/
rm -rf internal/infrastructure/mediator/
rm -rf internal/middleware/
rm -rf internal/common/
rm -rf docs/
rm buf.yaml buf.gen.yaml

注意:保留 internal/infrastructure/s3/internal/infrastructure/grpc/internal/infrastructure/repository/internal/infrastructure/database/,后续 Task 会逐步替换。

  • Step 11: Commit
git add -A
git commit -m "chore: initialize Kratos project skeleton, add proto config and Makefile"

Task 2: 定义文件服务 Proto API

Files:

  • Create: api/file/v1/file.proto

  • Create: api/file/v1/error_reason.proto

  • Step 1: 创建错误枚举 proto — api/file/v1/error_reason.proto

syntax = "proto3";

package api.file.v1;

option go_package = "rag/file-system/api/file/v1";

import "errors/errors.proto";

enum ErrorReason {
  option (errors.default_code) = 500;

  BUCKET_NOT_FOUND = 0 [(errors.code) = 404];
  FILE_NOT_FOUND = 1 [(errors.code) = 404];
  FOLDER_NOT_FOUND = 2 [(errors.code) = 404];
  SHARE_NOT_FOUND = 3 [(errors.code) = 404];
  INVALID_PARAMETER = 4 [(errors.code) = 400];
  PATH_TRAVERSAL_DETECTED = 5 [(errors.code) = 400];
  INVALID_BUCKET_NAME = 6 [(errors.code) = 400];
  STORAGE_OPERATION_FAILED = 7 [(errors.code) = 500];
  SHARE_PASSWORD_REQUIRED = 8 [(errors.code) = 401];
  SHARE_EXPIRED = 9 [(errors.code) = 410];
  SHARE_DOWNLOAD_LIMIT_REACHED = 10 [(errors.code) = 429];
  FOLDER_NAME_CONFLICT = 11 [(errors.code) = 409];
}
  • Step 2: 创建文件服务 proto — api/file/v1/file.proto

这个 proto 定义了 file-system 暴露给 rag-backend 的全部 gRPC/HTTP 接口。

syntax = "proto3";

package api.file.v1;

option go_package = "rag/file-system/api/file/v1";

import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "validate/validate.proto";

service FileService {
  // === 文件操作 ===
  rpc UploadFile (UploadFileRequest) returns (UploadFileResponse) {
    option (google.api.http) = { post: "/files/upload" body: "*" };
  }
  rpc DownloadFile (DownloadFileRequest) returns (DownloadFileResponse) {
    option (google.api.http) = { get: "/files/download" };
  }
  rpc ListFiles (ListFilesRequest) returns (ListFilesResponse) {
    option (google.api.http) = { get: "/files/list" };
  }
  rpc GetFilePreview (GetFilePreviewRequest) returns (GetFilePreviewResponse) {
    option (google.api.http) = { get: "/files/preview" };
  }
  rpc GetFileContent (GetFileContentRequest) returns (GetFileContentResponse) {
    option (google.api.http) = { get: "/files/content" };
  }
  rpc DeleteFile (DeleteFileRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { delete: "/files/delete" };
  }

  // === 分片上传 ===
  rpc InitMultipartUpload (InitMultipartRequest) returns (InitMultipartResponse) {
    option (google.api.http) = { post: "/files/multipart/init" body: "*" };
  }
  rpc UploadPart (UploadPartRequest) returns (UploadPartResponse) {
    option (google.api.http) = { put: "/files/multipart/part" body: "*" };
  }
  rpc CompleteMultipartUpload (CompleteMultipartRequest) returns (CompleteMultipartResponse) {
    option (google.api.http) = { post: "/files/multipart/complete" body: "*" };
  }
  rpc AbortMultipartUpload (AbortMultipartRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { post: "/files/multipart/abort" body: "*" };
  }

  // === 桶操作 ===
  rpc CreateBucket (CreateBucketRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { post: "/buckets" body: "*" };
  }
  rpc ListBuckets (google.protobuf.Empty) returns (ListBucketsResponse) {
    option (google.api.http) = { get: "/buckets" };
  }
  rpc DeleteBucket (DeleteBucketRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { delete: "/buckets" };
  }

  // === 文件夹操作 ===
  rpc CreateFolder (CreateFolderRequest) returns (Folder) {
    option (google.api.http) = { post: "/folders" body: "*" };
  }
  rpc GetFolderTree (GetFolderTreeRequest) returns (GetFolderTreeResponse) {
    option (google.api.http) = { get: "/folders/tree" };
  }
  rpc GetFolder (GetFolderRequest) returns (FolderWithChildren) {
    option (google.api.http) = { get: "/folders/{id}" };
  }
  rpc RenameFolder (RenameFolderRequest) returns (Folder) {
    option (google.api.http) = { put: "/folders/{id}" body: "*" };
  }
  rpc DeleteFolder (DeleteFolderRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { delete: "/folders/{id}" };
  }
  rpc UploadToFolder (UploadToFolderRequest) returns (FileMeta) {
    option (google.api.http) = { post: "/folders/{folder_id}/files" body: "*" };
  }
  rpc MoveFile (MoveFileRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { post: "/files/{id}/move" body: "*" };
  }

  // === 分享操作 ===
  rpc CreateShare (CreateShareRequest) returns (ShareLink) {
    option (google.api.http) = { post: "/share" body: "*" };
  }
  rpc DeleteShare (DeleteShareRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = { delete: "/share/{id}" };
  }
  rpc GetShareInfo (GetShareInfoRequest) returns (ShareInfo) {
    option (google.api.http) = { get: "/share/{token}" };
  }
  rpc DownloadShare (DownloadShareRequest) returns (DownloadShareResponse) {
    option (google.api.http) = { post: "/share/{token}/download" body: "*" };
  }
}

// === 文件消息 ===

message UploadFileRequest {
  string bucket_name = 1 [(validate.rules).string.min_len = 1];
  string object_key = 2 [(validate.rules).string.min_len = 1];
  bytes data = 3;
  string content_type = 4;
}

message UploadFileResponse {
  string message = 1;
  string object_key = 2;
}

message DownloadFileRequest {
  string bucket_name = 1;
  string object_key = 2;
}

message DownloadFileResponse {
  bytes data = 1;
  string content_type = 2;
  string file_name = 3;
}

message ListFilesRequest {
  string bucket_name = 1;
  string prefix = 2;
  int32 max_keys = 3;
  string continuation_token = 4;
}

message FileInfo {
  string key = 1;
  int64 size = 2;
  string last_modified = 3;
  string etag = 4;
}

message ListFilesResponse {
  repeated FileInfo files = 1;
  string next_continuation_token = 2;
}

message GetFilePreviewRequest {
  string bucket_name = 1;
  string object_key = 2;
}

message GetFilePreviewResponse {
  string presigned_url = 1;
}

message GetFileContentRequest {
  string bucket_name = 1;
  string object_key = 2;
}

message GetFileContentResponse {
  string content = 1;
}

message DeleteFileRequest {
  string bucket_name = 1;
  string object_key = 2;
}

// === 分片上传消息 ===

message InitMultipartRequest {
  string bucket_name = 1;
  string object_key = 2;
}

message InitMultipartResponse {
  string upload_id = 1;
}

message UploadPartRequest {
  string bucket_name = 1;
  string object_key = 2;
  string upload_id = 3;
  int32 part_number = 4;
  bytes data = 5;
}

message UploadPartResponse {
  string etag = 1;
}

message CompletedPart {
  int32 part_number = 1;
  string etag = 2;
}

message CompleteMultipartRequest {
  string bucket_name = 1;
  string object_key = 2;
  string upload_id = 3;
  repeated CompletedPart parts = 4;
}

message CompleteMultipartResponse {
  string location = 1;
}

message AbortMultipartRequest {
  string bucket_name = 1;
  string object_key = 2;
  string upload_id = 3;
}

// === 桶消息 ===

message CreateBucketRequest {
  string name = 1 [(validate.rules).string = {min_len: 3, max_len: 63}];
}

message ListBucketsResponse {
  repeated string buckets = 1;
}

message DeleteBucketRequest {
  string name = 1;
}

// === 文件夹消息 ===

message Folder {
  string id = 1;
  string parent_id = 2;
  string name = 3;
  string owner_id = 4;
  string created_at = 5;
  string updated_at = 6;
}

message FolderWithChildren {
  Folder folder = 1;
  repeated Folder sub_folders = 2;
  repeated FileMeta files = 3;
}

message CreateFolderRequest {
  string parent_id = 1;
  string name = 2 [(validate.rules).string.min_len = 1];
  string owner_id = 3;
}

message GetFolderTreeRequest {
  string owner_id = 1;
}

message GetFolderTreeResponse {
  repeated Folder folders = 1;
}

message GetFolderRequest {
  string id = 1;
  string owner_id = 2;
}

message RenameFolderRequest {
  string id = 1;
  string name = 2 [(validate.rules).string.min_len = 1];
  string owner_id = 3;
}

message DeleteFolderRequest {
  string id = 1;
  string owner_id = 2;
}

message FileMeta {
  string id = 1;
  string folder_id = 2;
  string name = 3;
  string s3_key = 4;
  string s3_bucket = 5;
  int64 size = 6;
  string content_type = 7;
  string owner_id = 8;
  string created_at = 9;
  string updated_at = 10;
}

message UploadToFolderRequest {
  string folder_id = 1;
  string file_name = 2;
  bytes data = 3;
  string content_type = 4;
  string owner_id = 5;
}

message MoveFileRequest {
  string id = 1;
  string target_folder_id = 2;
  string owner_id = 3;
}

// === 分享消息 ===

message ShareLink {
  string id = 1;
  string resource_type = 2;
  string resource_id = 3;
  string token = 4;
  string password = 5;
  string expires_at = 6;
  int32 download_count = 7;
  int32 max_downloads = 8;
  string created_by = 9;
  string created_at = 10;
}

message ShareInfo {
  string token = 1;
  string resource_type = 2;
  string file_name = 3;
  int64 file_size = 4;
  bool has_password = 5;
  string expires_at = 6;
}

message CreateShareRequest {
  string resource_type = 1;
  string resource_id = 2;
  string password = 3;
  string expires_at = 4;
  int32 max_downloads = 5;
  string created_by = 6;
}

message DeleteShareRequest {
  string id = 1;
  string created_by = 2;
}

message GetShareInfoRequest {
  string token = 1;
}

message DownloadShareRequest {
  string token = 1;
  string password = 2;
}

message DownloadShareResponse {
  string presigned_url = 1;
  string file_name = 2;
}
  • Step 3: 生成 proto 代码
cd /Users/wen/project/rag/file-system && buf generate

Expected: 生成 api/file/v1/file.pb.go, api/file/v1/file_grpc.pb.go, api/file/v1/file_http.pb.go, api/file/v1/error_reason.pb.go

  • Step 4: Commit
git add -A
git commit -m "feat: define file service proto API with HTTP+gRPC annotations"

Task 3: 更新 go.mod — 引入 Kratos + Watermill + GORM

Files:

  • Modify: go.mod

  • Step 1: 添加新依赖并清理旧依赖

cd /Users/wen/project/rag/file-system

# 新增依赖
go get github.com/go-kratos/kratos/v2@latest
go get github.com/go-kratos/kratos/v2/transport/http@latest
go get github.com/go-kratos/kratos/v2/transport/grpc@latest
go get github.com/go-kratos/kratos/v2/config@latest
go get github.com/go-kratos/kratos/v2/config/file@latest
go get github.com/go-kratos/kratos/v2/log@latest
go get github.com/go-kratos/kratos/v2/middleware/logging@latest
go get github.com/go-kratos/kratos/v2/middleware/recovery@latest
go get github.com/go-kratos/kratos/v2/middleware/tracing@latest
go get github.com/go-kratos/kratos/v2/middleware/validate@latest
go get github.com/go-kratos/kratos/v2/middleware/auth/jwt@latest
go get github.com/go-kratos/kratos/v2/transport/grpc@latest
go get github.com/google/wire/cmd/wire@latest
go get github.com/ThreeDotsLabs/watermill@latest
go get github.com/ThreeDotsLabs/watermill-sql/v2@latest
go get github.com/ThreeDotsLabs/watermill/components/cqrs@latest
go get gorm.io/gorm@latest
go get gorm.io/driver/postgres@latest
go get github.com/go-kratos/kratos/v2/contrib/registry/consul/v2@latest

# 清理旧依赖
go mod tidy
  • Step 2: 验证依赖安装成功
go mod verify

Expected: all modules verified

  • Step 3: Commit
git add go.mod go.sum
git commit -m "chore: add Kratos, Watermill, GORM dependencies; remove Gin, Swagger"

Task 4: 创建共享工具包(保留旧逻辑)

Files:

  • Create: internal/pkg/sanitize/sanitize.go

  • Create: internal/pkg/sanitize/sanitize_test.go

  • Create: internal/pkg/s3errors/s3errors.go

  • Step 1: 创建 sanitize 包 — internal/pkg/sanitize/sanitize.go

直接从 internal/common/sanitize.go 迁移,包名改为 sanitize

package sanitize

import (
	"errors"
	"regexp"
	"strings"
)

var bucketNameRegex = regexp.MustCompile(`^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$`)

func ObjectKey(key string) error {
	if strings.Contains(key, "..") || strings.Contains(key, "//") || strings.HasPrefix(key, "/") {
		return errors.New("invalid object key: path traversal detected")
	}
	return nil
}

func BucketName(name string) error {
	if !bucketNameRegex.MatchString(name) {
		return errors.New("invalid bucket name: must be 3-63 lowercase letters, digits, hyphens, or dots")
	}
	if len(name) < 3 || len(name) > 63 {
		return errors.New("invalid bucket name: must be between 3 and 63 characters")
	}
	return nil
}

func Filename(name string) string {
	safe := strings.ReplaceAll(name, `"`, `\"`)
	safe = strings.ReplaceAll(safe, "\r", "")
	safe = strings.ReplaceAll(safe, "\n", "")
	return safe
}
  • Step 2: 创建 sanitize 测试 — internal/pkg/sanitize/sanitize_test.go

internal/common/sanitize_test.go 迁移,调整调用为 sanitize.ObjectKey() 等。

package sanitize

import "testing"

func TestObjectKey(t *testing.T) {
	tests := []struct {
		name    string
		key     string
		wantErr bool
	}{
		{"valid", "path/to/file.txt", false},
		{"traversal", "../etc/passwd", true},
		{"double_slash", "path//file", true},
		{"leading_slash", "/absolute/path", true},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			err := ObjectKey(tt.key)
			if (err != nil) != tt.wantErr {
				t.Errorf("ObjectKey(%q) error = %v, wantErr %v", tt.key, err, tt.wantErr)
			}
		})
	}
}

func TestBucketName(t *testing.T) {
	tests := []struct {
		name    string
		input   string
		wantErr bool
	}{
		{"valid", "my-bucket", false},
		{"too_short", "ab", true},
		{"uppercase", "MyBucket", true},
		{"valid_dots", "my.bucket.test", false},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			err := BucketName(tt.input)
			if (err != nil) != tt.wantErr {
				t.Errorf("BucketName(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr)
			}
		})
	}
}
  • Step 3: 运行测试
cd /Users/wen/project/rag/file-system && go test ./internal/pkg/sanitize/ -v

Expected: PASS

  • Step 4: 创建 s3errors 包 — internal/pkg/s3errors/s3errors.go
package s3errors

import (
	"errors"
	"fmt"

	"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

func Wrap(err error) error {
	if err == nil {
		return nil
	}
	var (
		noSuchBucket *types.NoSuchBucket
		noSuchKey    *types.NoSuchKey
		notFound     *types.NotFound
	)
	if errors.As(err, &noSuchBucket) || errors.As(err, &noSuchKey) || errors.As(err, &notFound) {
		return fmt.Errorf("resource not found: %w", err)
	}
	return fmt.Errorf("storage operation failed: %w", err)
}

func IsNotFound(err error) bool {
	var (
		noSuchBucket *types.NoSuchBucket
		noSuchKey    *types.NoSuchKey
		notFound     *types.NotFound
	)
	return errors.As(err, &noSuchBucket) || errors.As(err, &noSuchKey) || errors.As(err, &notFound)
}
  • Step 5: Commit
git add internal/pkg/
git commit -m "feat: add shared sanitize and s3errors packages"

Task 5: 创建 data 层GORM + S3

Files:

  • Create: internal/data/data.go

  • Create: internal/data/file_repo.go

  • Create: internal/data/folder_repo.go

  • Create: internal/data/file_meta_repo.go

  • Create: internal/data/share_repo.go

  • Delete: internal/infrastructure/database/postgres.go

  • Delete: internal/infrastructure/repository/

  • Delete: internal/infrastructure/s3/

  • Step 1: 创建 data.go — GORM 数据库 + 事务管理

package data

import (
	"context"

	"rag/file-system/internal/conf"

	"github.com/go-kratos/kratos/v2/log"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

type Data struct {
	db  *gorm.DB
	log *log.Helper
}

type contextTxKey struct{}

func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) {
	helper := log.NewHelper(logger)
	db, err := gorm.Open(postgres.Open(c.Database.Source), &gorm.Config{})
	if err != nil {
		return nil, nil, err
	}

	sqlDB, err := db.DB()
	if err != nil {
		return nil, nil, err
	}
	sqlDB.SetMaxOpenConns(25)
	sqlDB.SetMaxIdleConns(5)

	// 自动迁移
	if err := db.AutoMigrate(&FolderPO{}, &FileMetaPO{}, &ShareLinkPO{}); err != nil {
		return nil, nil, err
	}

	helper.Info("connected to PostgreSQL via GORM")
	cleanup := func() {
		sqlDB.Close()
	}
	return &Data{db: db, log: helper}, cleanup, nil
}

func (d *Data) DB(ctx context.Context) *gorm.DB {
	tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB)
	if ok {
		return tx
	}
	return d.db.WithContext(ctx)
}

// Transaction 接口供 biz 层使用
type Transaction interface {
	InTx(ctx context.Context, fn func(ctx context.Context) error) error
}

func (d *Data) InTx(ctx context.Context, fn func(ctx context.Context) error) error {
	return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
		ctx = context.WithValue(ctx, contextTxKey{}, tx)
		return fn(ctx)
	})
}
  • Step 2: 创建 GORM 模型(持久化对象)— 在 data.go 底部或单独文件

这些是 GORM 表模型,与 biz 层实体分离。在 internal/data/data.go 中继续添加:

// FolderPO 是 folders 表的 GORM 模型
type FolderPO struct {
	ID        string   `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"`
	ParentID  *string  `gorm:"type:uuid;index:idx_folders_parent"`
	Name      string   `gorm:"type:varchar(255);not null"`
	OwnerID   string   `gorm:"type:varchar(36);not null;index:idx_folders_owner"`
	CreatedAt time.Time `gorm:"autoCreateTime"`
	UpdatedAt time.Time `gorm:"autoUpdateTime"`
}

func (FolderPO) TableName() string { return "folders" }

// FileMetaPO 是 files 表的 GORM 模型
type FileMetaPO struct {
	ID          string    `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"`
	FolderID    string    `gorm:"type:uuid;index:idx_files_folder"`
	Name        string    `gorm:"type:varchar(255);not null"`
	S3Key       string    `gorm:"type:varchar(512);not null;index:idx_files_s3_key"`
	S3Bucket    string    `gorm:"type:varchar(255);not null"`
	Size        int64     `gorm:"default:0"`
	ContentType string    `gorm:"type:varchar(255);default:'application/octet-stream'"`
	OwnerID     string    `gorm:"type:varchar(36);not null;index:idx_files_owner"`
	CreatedAt   time.Time `gorm:"autoCreateTime"`
	UpdatedAt   time.Time `gorm:"autoUpdateTime"`
}

func (FileMetaPO) TableName() string { return "files" }

// ShareLinkPO 是 share_links 表的 GORM 模型
type ShareLinkPO struct {
	ID            string     `gorm:"primaryKey;type:uuid;default:gen_random_uuid()"`
	ResourceType  string     `gorm:"type:varchar(10);not null"`
	ResourceID    string     `gorm:"type:uuid;not null"`
	Token         string     `gorm:"type:varchar(32);not null;uniqueIndex:idx_share_token"`
	Password      *string    `gorm:"type:varchar(255)"`
	ExpiresAt     *time.Time `gorm:"type:timestamptz"`
	DownloadCount int        `gorm:"default:0"`
	MaxDownloads  *int
	CreatedBy     string     `gorm:"type:varchar(36);not null"`
	CreatedAt     time.Time  `gorm:"autoCreateTime"`
}

func (ShareLinkPO) TableName() string { return "share_links" }

注意:需要在文件头部 import "time"

  • Step 3: 创建 S3 file repo — internal/data/file_repo.go

internal/infrastructure/s3/file_repository_impl.go 迁移,保持 AWS SDK v2 不变。

package data

import (
	"context"
	"fmt"
	"io"
	"time"

	"rag/file-system/internal/conf"
	"rag/file-system/internal/pkg/s3errors"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

type FileRepo struct {
	client        *s3.Client
	presignClient *s3.PresignClient
}

func NewFileRepo(c *conf.Data) *FileRepo {
	customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
		return aws.Endpoint{
			URL:           c.S3.Endpoint,
			SigningRegion: c.S3.Region,
		}, nil
	})

	awsCfg, err := config.LoadDefaultConfig(context.TODO(),
		config.WithRegion(c.S3.Region),
		config.WithEndpointResolverWithOptions(customResolver),
		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
			c.S3.AccessKey,
			c.S3.SecretKey,
			"",
		)),
	)
	if err != nil {
		panic(fmt.Sprintf("unable to load S3 config: %v", err))
	}

	client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
		o.UsePathStyle = true
	})

	return &FileRepo{
		client:        client,
		presignClient: s3.NewPresignClient(client),
	}
}

func (r *FileRepo) UploadFile(ctx context.Context, bucket, key string, data io.Reader) error {
	_, err := r.client.PutObject(ctx, &s3.PutObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
		Body:   data,
	})
	return s3errors.Wrap(err)
}

func (r *FileRepo) DownloadFile(ctx context.Context, bucket, key string) (io.ReadCloser, error) {
	out, err := r.client.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	if err != nil {
		return nil, s3errors.Wrap(err)
	}
	return out.Body, nil
}

func (r *FileRepo) ListBuckets(ctx context.Context) ([]string, error) {
	out, err := r.client.ListBuckets(ctx, &s3.ListBucketsInput{})
	if err != nil {
		return nil, s3errors.Wrap(err)
	}
	buckets := make([]string, 0, len(out.Buckets))
	for _, b := range out.Buckets {
		buckets = append(buckets, *b.Name)
	}
	return buckets, nil
}

func (r *FileRepo) CreateBucket(ctx context.Context, name string) error {
	_, err := r.client.CreateBucket(ctx, &s3.CreateBucketInput{
		Bucket: aws.String(name),
	})
	return s3errors.Wrap(err)
}

func (r *FileRepo) DeleteBucket(ctx context.Context, name string) error {
	_, err := r.client.DeleteBucket(ctx, &s3.DeleteBucketInput{
		Bucket: aws.String(name),
	})
	return s3errors.Wrap(err)
}

func (r *FileRepo) ListObjectsV2(ctx context.Context, bucket, prefix string, maxKeys int32, token *string) ([]FileInfo, *string, error) {
	input := &s3.ListObjectsV2Input{
		Bucket:            aws.String(bucket),
		Prefix:            aws.String(prefix),
		MaxKeys:           aws.Int32(maxKeys),
		ContinuationToken: token,
	}
	out, err := r.client.ListObjectsV2(ctx, input)
	if err != nil {
		return nil, nil, s3errors.Wrap(err)
	}
	files := make([]FileInfo, 0, len(out.Contents))
	for _, obj := range out.Contents {
		files = append(files, FileInfo{
			Key:          *obj.Key,
			Size:         *obj.Size,
			LastModified: *obj.LastModified,
			ETag:         *obj.ETag,
		})
	}
	return files, out.NextContinuationToken, nil
}

func (r *FileRepo) GeneratePresignedURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error) {
	req, err := r.presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	}, func(opts *s3.PresignOptions) {
		opts.Expires = expiry
	})
	if err != nil {
		return "", s3errors.Wrap(err)
	}
	return req.URL, nil
}

func (r *FileRepo) GetFileContent(ctx context.Context, bucket, key string) (string, error) {
	out, err := r.client.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	if err != nil {
		return "", s3errors.Wrap(err)
	}
	defer out.Body.Close()
	data, err := io.ReadAll(io.LimitReader(out.Body, 10<<20)) // 10MB max
	if err != nil {
		return "", err
	}
	return string(data), nil
}

func (r *FileRepo) DeleteFile(ctx context.Context, bucket, key string) error {
	_, err := r.client.DeleteObject(ctx, &s3.DeleteObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	return s3errors.Wrap(err)
}

func (r *FileRepo) CreateMultipartUpload(ctx context.Context, bucket, key string) (string, error) {
	out, err := r.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	if err != nil {
		return "", s3errors.Wrap(err)
	}
	return *out.UploadId, nil
}

func (r *FileRepo) UploadPart(ctx context.Context, bucket, key, uploadID string, partNumber int32, data io.Reader) (string, error) {
	out, err := r.client.UploadPart(ctx, &s3.UploadPartInput{
		Bucket:     aws.String(bucket),
		Key:        aws.String(key),
		UploadId:   aws.String(uploadID),
		PartNumber: aws.Int32(partNumber),
		Body:       data,
	})
	if err != nil {
		return "", s3errors.Wrap(err)
	}
	return *out.ETag, nil
}

func (r *FileRepo) CompleteMultipartUpload(ctx context.Context, bucket, key, uploadID string, parts []Part) (string, error) {
	completedParts := make([]types.CompletedPart, 0, len(parts))
	for _, p := range parts {
		completedParts = append(completedParts, types.CompletedPart{
			PartNumber: aws.Int32(p.PartNumber),
			ETag:       aws.String(p.ETag),
		})
	}
	out, err := r.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
		Bucket:   aws.String(bucket),
		Key:      aws.String(key),
		UploadId: aws.String(uploadID),
		MultipartUpload: &types.CompletedMultipartUpload{
			Parts: completedParts,
		},
	})
	if err != nil {
		return "", s3errors.Wrap(err)
	}
	return *out.Location, nil
}

func (r *FileRepo) AbortMultipartUpload(ctx context.Context, bucket, key, uploadID string) error {
	_, err := r.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
		Bucket:   aws.String(bucket),
		Key:      aws.String(key),
		UploadId: aws.String(uploadID),
	})
	return s3errors.Wrap(err)
}

// FileInfo 和 Part 是 data 层共享的结构体
type FileInfo struct {
	Key          string
	Size         int64
	LastModified time.Time
	ETag         string
}

type Part struct {
	PartNumber int32
	ETag       string
}
  • Step 4: 创建 folder repo — internal/data/folder_repo.go

internal/infrastructure/repository/folder_repo_impl.go 迁移到 GORM。

package data

import (
	"context"
	"errors"

	"rag/file-system/api/file/v1"

	"gorm.io/gorm"
)

type FolderRepo struct {
	data *Data
}

func NewFolderRepo(data *Data) *FolderRepo {
	return &FolderRepo{data: data}
}

func (r *FolderRepo) Create(ctx context.Context, po *FolderPO) error {
	return r.data.DB(ctx).Create(po).Error
}

func (r *FolderRepo) GetByID(ctx context.Context, id string) (*FolderPO, error) {
	var po FolderPO
	err := r.data.DB(ctx).Where("id = ?", id).First(&po).Error
	if errors.Is(err, gorm.ErrRecordNotFound) {
		return nil, api.file.v1.ErrorFolderNotFound("folder %s not found", id)
	}
	return &po, err
}

func (r *FolderRepo) GetWithChildren(ctx context.Context, id, ownerID string) (*FolderPO, []FolderPO, []FileMetaPO, error) {
	var folder FolderPO
	if err := r.data.DB(ctx).Where("id = ? AND owner_id = ?", id, ownerID).First(&folder).Error; err != nil {
		if errors.Is(err, gorm.ErrRecordNotFound) {
			return nil, nil, nil, api.file.v1.ErrorFolderNotFound("folder %s not found", id)
		}
		return nil, nil, nil, err
	}
	var subFolders []FolderPO
	r.data.DB(ctx).Where("parent_id = ?", id).Find(&subFolders)
	var files []FileMetaPO
	r.data.DB(ctx).Where("folder_id = ?", id).Find(&files)
	return &folder, subFolders, files, nil
}

func (r *FolderRepo) GetTree(ctx context.Context, ownerID string) ([]FolderPO, error) {
	var folders []FolderPO
	err := r.data.DB(ctx).Where("owner_id = ?", ownerID).Order("created_at ASC").Find(&folders).Error
	return folders, err
}

func (r *FolderRepo) Update(ctx context.Context, po *FolderPO) error {
	return r.data.DB(ctx).Save(po).Error
}

func (r *FolderRepo) Delete(ctx context.Context, id, ownerID string) error {
	return r.data.DB(ctx).Where("id = ? AND owner_id = ?", id, ownerID).Delete(&FolderPO{}).Error
}

func (r *FolderRepo) GetDescendantFileS3Keys(ctx context.Context, id, ownerID string) ([]FileMetaPO, error) {
	var files []FileMetaPO
	// 用递归 CTE 查找所有后代文件夹的文件
	err := r.data.DB(ctx).Raw(`
		WITH RECURSIVE descendants AS (
			SELECT id FROM folders WHERE id = ? AND owner_id = ?
			UNION ALL
			SELECT f.id FROM folders f INNER JOIN descendants d ON f.parent_id = d.id
		)
		SELECT fm.* FROM files fm WHERE fm.folder_id IN (SELECT id FROM descendants)
	`, id, ownerID).Scan(&files).Error
	return files, err
}

注意:错误引用 api.file.v1.ErrorFolderNotFound 需要在 proto 代码生成后才能编译。Task 2 会生成这些函数。

  • Step 5: 创建 file meta repo — internal/data/file_meta_repo.go
package data

import (
	"context"
	"errors"

	"gorm.io/gorm"
)

type FileMetaRepo struct {
	data *Data
}

func NewFileMetaRepo(data *Data) *FileMetaRepo {
	return &FileMetaRepo{data: data}
}

func (r *FileMetaRepo) Create(ctx context.Context, po *FileMetaPO) error {
	return r.data.DB(ctx).Create(po).Error
}

func (r *FileMetaRepo) GetByID(ctx context.Context, id string) (*FileMetaPO, error) {
	var po FileMetaPO
	err := r.data.DB(ctx).Where("id = ?", id).First(&po).Error
	if errors.Is(err, gorm.ErrRecordNotFound) {
		return nil, errors.New("file not found")
	}
	return &po, err
}

func (r *FileMetaRepo) GetByFolder(ctx context.Context, folderID string) ([]FileMetaPO, error) {
	var files []FileMetaPO
	err := r.data.DB(ctx).Where("folder_id = ?", folderID).Order("created_at ASC").Find(&files).Error
	return files, err
}

func (r *FileMetaRepo) Move(ctx context.Context, fileID, targetFolderID, ownerID string) error {
	return r.data.DB(ctx).Model(&FileMetaPO{}).Where("id = ? AND owner_id = ?", fileID, ownerID).
		Update("folder_id", targetFolderID).Error
}

func (r *FileMetaRepo) Delete(ctx context.Context, id, ownerID string) error {
	return r.data.DB(ctx).Where("id = ? AND owner_id = ?", id, ownerID).Delete(&FileMetaPO{}).Error
}
  • Step 6: 创建 share repo — internal/data/share_repo.go
package data

import (
	"context"
	"errors"

	"gorm.io/gorm"
)

type ShareRepo struct {
	data *Data
}

func NewShareRepo(data *Data) *ShareRepo {
	return &ShareRepo{data: data}
}

func (r *ShareRepo) Create(ctx context.Context, po *ShareLinkPO) error {
	return r.data.DB(ctx).Create(po).Error
}

func (r *ShareRepo) GetByToken(ctx context.Context, token string) (*ShareLinkPO, error) {
	var po ShareLinkPO
	err := r.data.DB(ctx).Where("token = ?", token).First(&po).Error
	if errors.Is(err, gorm.ErrRecordNotFound) {
		return nil, errors.New("share not found")
	}
	return &po, err
}

func (r *ShareRepo) GetByID(ctx context.Context, id string) (*ShareLinkPO, error) {
	var po ShareLinkPO
	err := r.data.DB(ctx).Where("id = ?", id).First(&po).Error
	if errors.Is(err, gorm.ErrRecordNotFound) {
		return nil, errors.New("share not found")
	}
	return &po, err
}

func (r *ShareRepo) Delete(ctx context.Context, id, createdBy string) error {
	return r.data.DB(ctx).Where("id = ? AND created_by = ?", id, createdBy).Delete(&ShareLinkPO{}).Error
}

func (r *ShareRepo) IncrementDownloadCount(ctx context.Context, token string) error {
	return r.data.DB(ctx).Model(&ShareLinkPO{}).Where("token = ?", token).
		UpdateColumn("download_count", gorm.Expr("download_count + 1")).Error
}

func (r *ShareRepo) ListByResource(ctx context.Context, resourceType, resourceID string) ([]ShareLinkPO, error) {
	var shares []ShareLinkPO
	err := r.data.DB(ctx).Where("resource_type = ? AND resource_id = ?", resourceType, resourceID).Find(&shares).Error
	return shares, err
}
  • Step 7: 删除旧基础设施文件
rm -rf internal/infrastructure/
  • Step 8: Commit
git add -A
git commit -m "feat: add data layer with GORM models, S3 repo, PG repos"

Task 6: 创建 biz 层(业务逻辑 + repo 接口定义)

Files:

  • Create: internal/biz/biz.go

  • Create: internal/biz/file.go

  • Create: internal/biz/bucket.go

  • Create: internal/biz/folder.go

  • Create: internal/biz/share.go

  • Delete: internal/domain/

  • Step 1: 创建 biz.go — ProviderSet + Transaction 接口

package biz

import (
	"github.com/google/wire"
)

var ProviderSet = wire.NewSet(
	NewFileUsecase,
	NewBucketUsecase,
	NewFolderUsecase,
	NewShareUsecase,
)
  • Step 2: 创建 file.go — 文件 usecase
package biz

import (
	"context"
	"io"
	"time"

	v1 "rag/file-system/api/file/v1"

	"github.com/go-kratos/kratos/v2/log"
)

// FileRepo 定义 S3 文件操作接口(由 data.FileRepo 实现)
type FileRepo interface {
	UploadFile(ctx context.Context, bucket, key string, data io.Reader) error
	DownloadFile(ctx context.Context, bucket, key string) (io.ReadCloser, error)
	ListObjectsV2(ctx context.Context, bucket, prefix string, maxKeys int32, token *string) ([]FileInfo, *string, error)
	GeneratePresignedURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
	GetFileContent(ctx context.Context, bucket, key string) (string, error)
	DeleteFile(ctx context.Context, bucket, key string) error
	CreateMultipartUpload(ctx context.Context, bucket, key string) (string, error)
	UploadPart(ctx context.Context, bucket, key, uploadID string, partNumber int32, data io.Reader) (string, error)
	CompleteMultipartUpload(ctx context.Context, bucket, key, uploadID string, parts []Part) (string, error)
	AbortMultipartUpload(ctx context.Context, bucket, key, uploadID string) error
}

// FileInfo 从 data 层引用
type FileInfo = struct {
	Key          string
	Size         int64
	LastModified time.Time
	ETag         string
}

// Part 从 data 层引用
type Part = struct {
	PartNumber int32
	ETag       string
}

type FileUsecase struct {
	repo FileRepo
	log  *log.Helper
}

func NewFileUsecase(repo FileRepo, logger log.Logger) *FileUsecase {
	return &FileUsecase{repo: repo, log: log.NewHelper(logger)}
}

func (uc *FileUsecase) UploadFile(ctx context.Context, bucket, key string, data io.Reader) error {
	return uc.repo.UploadFile(ctx, bucket, key, data)
}

func (uc *FileUsecase) DownloadFile(ctx context.Context, bucket, key string) (io.ReadCloser, error) {
	return uc.repo.DownloadFile(ctx, bucket, key)
}

func (uc *FileUsecase) ListFiles(ctx context.Context, bucket, prefix string, maxKeys int32, token *string) ([]FileInfo, *string, error) {
	return uc.repo.ListObjectsV2(ctx, bucket, prefix, maxKeys, token)
}

func (uc *FileUsecase) GetPreviewURL(ctx context.Context, bucket, key string) (string, error) {
	return uc.repo.GeneratePresignedURL(ctx, bucket, key, 24*time.Hour)
}

func (uc *FileUsecase) GetFileContent(ctx context.Context, bucket, key string) (string, error) {
	return uc.repo.GetFileContent(ctx, bucket, key)
}

func (uc *FileUsecase) DeleteFile(ctx context.Context, bucket, key string) error {
	return uc.repo.DeleteFile(ctx, bucket, key)
}

func (uc *FileUsecase) InitMultipart(ctx context.Context, bucket, key string) (string, error) {
	return uc.repo.CreateMultipartUpload(ctx, bucket, key)
}

func (uc *FileUsecase) UploadPart(ctx context.Context, bucket, key, uploadID string, partNumber int32, data io.Reader) (string, error) {
	return uc.repo.UploadPart(ctx, bucket, key, uploadID, partNumber, data)
}

func (uc *FileUsecase) CompleteMultipart(ctx context.Context, bucket, key, uploadID string, parts []Part) (string, error) {
	return uc.repo.CompleteMultipartUpload(ctx, bucket, key, uploadID, parts)
}

func (uc *FileUsecase) AbortMultipart(ctx context.Context, bucket, key, uploadID string) error {
	return uc.repo.AbortMultipartUpload(ctx, bucket, key, uploadID)
}
  • Step 3: 创建 bucket.go — 桶 usecase
package biz

import (
	"context"

	"github.com/go-kratos/kratos/v2/log"
)

type BucketUsecase struct {
	repo FileRepo
	log  *log.Helper
}

func NewBucketUsecase(repo FileRepo, logger log.Logger) *BucketUsecase {
	return &BucketUsecase{repo: repo, log: log.NewHelper(logger)}
}

func (uc *BucketUsecase) CreateBucket(ctx context.Context, name string) error {
	return uc.repo.CreateBucket(ctx, name)
}

func (uc *BucketUsecase) ListBuckets(ctx context.Context) ([]string, error) {
	return uc.repo.ListBuckets(ctx)
}

func (uc *BucketUsecase) DeleteBucket(ctx context.Context, name string) error {
	return uc.repo.DeleteBucket(ctx, name)
}

注意:FileRepo 接口需要增加 ListBucketsCreateBucketDeleteBucket 方法。在 file.go 的 FileRepo 接口中补充:

ListBuckets(ctx context.Context) ([]string, error)
CreateBucket(ctx context.Context, name string) error
DeleteBucket(ctx context.Context, name string) error
  • Step 4: 创建 folder.go — 文件夹 usecase
package biz

import (
	"context"
	"fmt"

	v1 "rag/file-system/api/file/v1"

	"rag/file-system/internal/data"
	"rag/file-system/internal/pkg/sanitize"

	"github.com/go-kratos/kratos/v2/log"
	"github.com/google/uuid"
)

// FolderMetaRepo 定义文件夹元数据操作接口(由 data.FolderRepo 实现)
type FolderMetaRepo interface {
	Create(ctx context.Context, po *data.FolderPO) error
	GetByID(ctx context.Context, id string) (*data.FolderPO, error)
	GetWithChildren(ctx context.Context, id, ownerID string) (*data.FolderPO, []data.FolderPO, []data.FileMetaPO, error)
	GetTree(ctx context.Context, ownerID string) ([]data.FolderPO, error)
	Update(ctx context.Context, po *data.FolderPO) error
	Delete(ctx context.Context, id, ownerID string) error
	GetDescendantFileS3Keys(ctx context.Context, id, ownerID string) ([]data.FileMetaPO, error)
}

// FileMetaRepo 定义文件元数据操作接口(由 data.FileMetaRepo 实现)
type FileMetaRepo interface {
	Create(ctx context.Context, po *data.FileMetaPO) error
	GetByID(ctx context.Context, id string) (*data.FileMetaPO, error)
	GetByFolder(ctx context.Context, folderID string) ([]data.FileMetaPO, error)
	Move(ctx context.Context, fileID, targetFolderID, ownerID string) error
	Delete(ctx context.Context, id, ownerID string) error
}

type FolderUsecase struct {
	folderRepo  FolderMetaRepo
	fileMetaRepo FileMetaRepo
	fileRepo    FileRepo
	log         *log.Helper
}

func NewFolderUsecase(folderRepo FolderMetaRepo, fileMetaRepo FileMetaRepo, fileRepo FileRepo, logger log.Logger) *FolderUsecase {
	return &FolderUsecase{folderRepo: folderRepo, fileMetaRepo: fileMetaRepo, fileRepo: fileRepo, log: log.NewHelper(logger)}
}

func (uc *FolderUsecase) CreateFolder(ctx context.Context, parentID *string, name, ownerID string) (*data.FolderPO, error) {
	po := &data.FolderPO{
		ID:       uuid.New().String(),
		ParentID: parentID,
		Name:     sanitize.Filename(name),
		OwnerID:  ownerID,
	}
	if err := uc.folderRepo.Create(ctx, po); err != nil {
		return nil, err
	}
	return po, nil
}

func (uc *FolderUsecase) GetFolder(ctx context.Context, id, ownerID string) (*data.FolderPO, []data.FolderPO, []data.FileMetaPO, error) {
	return uc.folderRepo.GetWithChildren(ctx, id, ownerID)
}

func (uc *FolderUsecase) GetFolderTree(ctx context.Context, ownerID string) ([]data.FolderPO, error) {
	return uc.folderRepo.GetTree(ctx, ownerID)
}

func (uc *FolderUsecase) RenameFolder(ctx context.Context, id, name, ownerID string) (*data.FolderPO, error) {
	po, err := uc.folderRepo.GetByID(ctx, id)
	if err != nil {
		return nil, err
	}
	po.Name = sanitize.Filename(name)
	if err := uc.folderRepo.Update(ctx, po); err != nil {
		return nil, err
	}
	return po, nil
}

func (uc *FolderUsecase) DeleteFolder(ctx context.Context, id, ownerID string) error {
	// 获取所有后代的 S3 key 并删除
	files, err := uc.folderRepo.GetDescendantFileS3Keys(ctx, id, ownerID)
	if err != nil {
		return err
	}
	for _, f := range files {
		_ = uc.fileRepo.DeleteFile(ctx, f.S3Bucket, f.S3Key)
	}
	return uc.folderRepo.Delete(ctx, id, ownerID)
}

func (uc *FolderUsecase) UploadToFolder(ctx context.Context, folderID, fileName string, fileData []byte, contentType, ownerID string) (*data.FileMetaPO, error) {
	s3Key := uuid.New().String()
	bucket := "default"

	if err := uc.fileRepo.UploadFile(ctx, bucket, s3Key, bytes.NewReader(fileData)); err != nil {
		return nil, err
	}

	po := &data.FileMetaPO{
		ID:          uuid.New().String(),
		FolderID:    folderID,
		Name:        sanitize.Filename(fileName),
		S3Key:       s3Key,
		S3Bucket:    bucket,
		Size:        int64(len(fileData)),
		ContentType: contentType,
		OwnerID:     ownerID,
	}
	if err := uc.fileMetaRepo.Create(ctx, po); err != nil {
		return nil, err
	}
	return po, nil
}

func (uc *FolderUsecase) MoveFile(ctx context.Context, fileID, targetFolderID, ownerID string) error {
	return uc.fileMetaRepo.Move(ctx, fileID, targetFolderID, ownerID)
}

注意:需要 import "bytes"

  • Step 5: 创建 share.go — 分享 usecase
package biz

import (
	"context"
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"time"

	"rag/file-system/internal/data"

	"github.com/go-kratos/kratos/v2/log"
	"github.com/google/uuid"
)

type ShareMetaRepo interface {
	Create(ctx context.Context, po *data.ShareLinkPO) error
	GetByToken(ctx context.Context, token string) (*data.ShareLinkPO, error)
	GetByID(ctx context.Context, id string) (*data.ShareLinkPO, error)
	Delete(ctx context.Context, id, createdBy string) error
	IncrementDownloadCount(ctx context.Context, token string) error
}

type ShareUsecase struct {
	shareRepo    ShareMetaRepo
	fileMetaRepo FileMetaRepo
	fileRepo     FileRepo
	log          *log.Helper
}

func NewShareUsecase(shareRepo ShareMetaRepo, fileMetaRepo FileMetaRepo, fileRepo FileRepo, logger log.Logger) *ShareUsecase {
	return &ShareUsecase{shareRepo: shareRepo, fileMetaRepo: fileMetaRepo, fileRepo: fileRepo, log: log.NewHelper(logger)}
}

func (uc *ShareUsecase) CreateShare(ctx context.Context, resourceType, resourceID, password string, expiresAt *time.Time, maxDownloads *int, createdBy string) (*data.ShareLinkPO, error) {
	tokenBytes := make([]byte, 16)
	if _, err := rand.Read(tokenBytes); err != nil {
		return nil, err
	}
	token := hex.EncodeToString(tokenBytes)

	po := &data.ShareLinkPO{
		ID:            uuid.New().String(),
		ResourceType:  resourceType,
		ResourceID:    resourceID,
		Token:         token,
		ExpiresAt:     expiresAt,
		MaxDownloads:  maxDownloads,
		CreatedBy:     createdBy,
	}
	if password != "" {
		po.Password = &password
	}
	if err := uc.shareRepo.Create(ctx, po); err != nil {
		return nil, err
	}
	return po, nil
}

func (uc *ShareUsecase) DeleteShare(ctx context.Context, id, createdBy string) error {
	return uc.shareRepo.Delete(ctx, id, createdBy)
}

func (uc *ShareUsecase) GetShareInfo(ctx context.Context, token string) (*data.ShareLinkPO, *data.FileMetaPO, error) {
	share, err := uc.shareRepo.GetByToken(ctx, token)
	if err != nil {
		return nil, nil, err
	}
	file, err := uc.fileMetaRepo.GetByID(ctx, share.ResourceID)
	if err != nil {
		return nil, nil, err
	}
	return share, file, nil
}

func (uc *ShareUsecase) DownloadShare(ctx context.Context, token string) (string, string, error) {
	share, err := uc.shareRepo.GetByToken(ctx, token)
	if err != nil {
		return "", "", err
	}
	if share.ExpiresAt != nil && share.ExpiresAt.Before(time.Now()) {
		return "", "", fmt.Errorf("share link expired")
	}
	if err := uc.shareRepo.IncrementDownloadCount(ctx, token); err != nil {
		return "", "", err
	}
	file, err := uc.fileMetaRepo.GetByID(ctx, share.ResourceID)
	if err != nil {
		return "", "", err
	}
	url, err := uc.fileRepo.GeneratePresignedURL(ctx, file.S3Bucket, file.S3Key, 5*time.Minute)
	if err != nil {
		return "", "", err
	}
	return url, file.Name, nil
}
  • Step 6: 删除旧 domain 层
rm -rf internal/domain/
  • Step 7: Commit
git add -A
git commit -m "feat: add biz layer with usecases for file, bucket, folder, share"

Task 7: 创建 service 层proto 接口实现)

Files:

  • Create: internal/service/service.go

  • Create: internal/service/file.go

  • Step 1: 创建 service.go — ProviderSet

package service

import "github.com/google/wire"

var ProviderSet = wire.NewSet(
	NewFileService,
)
  • Step 2: 创建 file.go — 实现 proto 定义的 FileService 接口
package service

import (
	"context"
	"bytes"

	pb "rag/file-system/api/file/v1"
	"rag/file-system/internal/biz"

	"github.com/go-kratos/kratos/v2/log"
)

type FileService struct {
	pb.UnimplementedFileServiceServer

	fileUC   *biz.FileUsecase
	bucketUC *biz.BucketUsecase
	folderUC *biz.FolderUsecase
	shareUC  *biz.ShareUsecase
	log      *log.Helper
}

func NewFileService(fileUC *biz.FileUsecase, bucketUC *biz.BucketUsecase, folderUC *biz.FolderUsecase, shareUC *biz.ShareUsecase, logger log.Logger) *FileService {
	return &FileService{
		fileUC:   fileUC,
		bucketUC: bucketUC,
		folderUC: folderUC,
		shareUC:  shareUC,
		log:      log.NewHelper(logger),
	}
}

// === 文件操作 ===

func (s *FileService) UploadFile(ctx context.Context, req *pb.UploadFileRequest) (*pb.UploadFileResponse, error) {
	if err := s.fileUC.UploadFile(ctx, req.BucketName, req.ObjectKey, bytes.NewReader(req.Data)); err != nil {
		return nil, err
	}
	return &pb.UploadFileResponse{Message: "uploaded", ObjectKey: req.ObjectKey}, nil
}

func (s *FileService) DownloadFile(ctx context.Context, req *pb.DownloadFileRequest) (*pb.DownloadFileResponse, error) {
	reader, err := s.fileUC.DownloadFile(ctx, req.BucketName, req.ObjectKey)
	if err != nil {
		return nil, err
	}
	defer reader.Close()
	data, err := io.ReadAll(reader)
	if err != nil {
		return nil, err
	}
	return &pb.DownloadFileResponse{Data: data}, nil
}

func (s *FileService) ListFiles(ctx context.Context, req *pb.ListFilesRequest) (*pb.ListFilesResponse, error) {
	files, nextToken, err := s.fileUC.ListFiles(ctx, req.BucketName, req.Prefix, req.MaxKeys, &req.ContinuationToken)
	if err != nil {
		return nil, err
	}
	resp := &pb.ListFilesResponse{NextContinuationToken: ""}
	if nextToken != nil {
		resp.NextContinuationToken = *nextToken
	}
	for _, f := range files {
		resp.Files = append(resp.Files, &pb.FileInfo{
			Key: f.Key, Size: f.Size,
			LastModified: f.LastModified.Format("2006-01-02T15:04:05Z"),
			Etag:         f.ETag,
		})
	}
	return resp, nil
}

func (s *FileService) GetFilePreview(ctx context.Context, req *pb.GetFilePreviewRequest) (*pb.GetFilePreviewResponse, error) {
	url, err := s.fileUC.GetPreviewURL(ctx, req.BucketName, req.ObjectKey)
	if err != nil {
		return nil, err
	}
	return &pb.GetFilePreviewResponse{PresignedUrl: url}, nil
}

func (s *FileService) GetFileContent(ctx context.Context, req *pb.GetFileContentRequest) (*pb.GetFileContentResponse, error) {
	content, err := s.fileUC.GetFileContent(ctx, req.BucketName, req.ObjectKey)
	if err != nil {
		return nil, err
	}
	return &pb.GetFileContentResponse{Content: content}, nil
}

func (s *FileService) DeleteFile(ctx context.Context, req *pb.DeleteFileRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.fileUC.DeleteFile(ctx, req.BucketName, req.ObjectKey)
}

// === 分片上传 ===

func (s *FileService) InitMultipartUpload(ctx context.Context, req *pb.InitMultipartRequest) (*pb.InitMultipartResponse, error) {
	uploadID, err := s.fileUC.InitMultipart(ctx, req.BucketName, req.ObjectKey)
	if err != nil {
		return nil, err
	}
	return &pb.InitMultipartResponse{UploadId: uploadID}, nil
}

func (s *FileService) UploadPart(ctx context.Context, req *pb.UploadPartRequest) (*pb.UploadPartResponse, error) {
	etag, err := s.fileUC.UploadPart(ctx, req.BucketName, req.ObjectKey, req.UploadId, req.PartNumber, bytes.NewReader(req.Data))
	if err != nil {
		return nil, err
	}
	return &pb.UploadPartResponse{Etag: etag}, nil
}

func (s *FileService) CompleteMultipartUpload(ctx context.Context, req *pb.CompleteMultipartRequest) (*pb.CompleteMultipartResponse, error) {
	parts := make([]biz.Part, 0, len(req.Parts))
	for _, p := range req.Parts {
		parts = append(parts, biz.Part{PartNumber: p.PartNumber, ETag: p.Etag})
	}
	location, err := s.fileUC.CompleteMultipart(ctx, req.BucketName, req.ObjectKey, req.UploadId, parts)
	if err != nil {
		return nil, err
	}
	return &pb.CompleteMultipartResponse{Location: location}, nil
}

func (s *FileService) AbortMultipartUpload(ctx context.Context, req *pb.AbortMultipartRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.fileUC.AbortMultipart(ctx, req.BucketName, req.ObjectKey, req.UploadId)
}

// === 桶操作 ===

func (s *FileService) CreateBucket(ctx context.Context, req *pb.CreateBucketRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.bucketUC.CreateBucket(ctx, req.Name)
}

func (s *FileService) ListBuckets(ctx context.Context, req *pb.Empty) (*pb.ListBucketsResponse, error) {
	buckets, err := s.bucketUC.ListBuckets(ctx)
	if err != nil {
		return nil, err
	}
	return &pb.ListBucketsResponse{Buckets: buckets}, nil
}

func (s *FileService) DeleteBucket(ctx context.Context, req *pb.DeleteBucketRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.bucketUC.DeleteBucket(ctx, req.Name)
}

// === 文件夹操作 ===

func (s *FileService) CreateFolder(ctx context.Context, req *pb.CreateFolderRequest) (*pb.Folder, error) {
	po, err := s.folderUC.CreateFolder(ctx, req.ParentId, req.Name, req.OwnerId)
	if err != nil {
		return nil, err
	}
	return &pb.Folder{Id: po.ID, ParentId: stringValue(po.ParentID), Name: po.Name, OwnerId: po.OwnerID}, nil
}

func (s *FileService) GetFolderTree(ctx context.Context, req *pb.GetFolderTreeRequest) (*pb.GetFolderTreeResponse, error) {
	folders, err := s.folderUC.GetFolderTree(ctx, req.OwnerId)
	if err != nil {
		return nil, err
	}
	resp := &pb.GetFolderTreeResponse{}
	for _, f := range folders {
		resp.Folders = append(resp.Folders, &pb.Folder{Id: f.ID, ParentId: stringValue(f.ParentID), Name: f.Name, OwnerId: f.OwnerID})
	}
	return resp, nil
}

func (s *FileService) GetFolder(ctx context.Context, req *pb.GetFolderRequest) (*pb.FolderWithChildren, error) {
	folder, subs, files, err := s.folderUC.GetFolder(ctx, req.Id, req.OwnerId)
	if err != nil {
		return nil, err
	}
	resp := &pb.FolderWithChildren{
		Folder: &pb.Folder{Id: folder.ID, ParentId: stringValue(folder.ParentID), Name: folder.Name, OwnerId: folder.OwnerID},
	}
	for _, sf := range subs {
		resp.SubFolders = append(resp.SubFolders, &pb.Folder{Id: sf.ID, ParentId: stringValue(sf.ParentID), Name: sf.Name})
	}
	for _, f := range files {
		resp.Files = append(resp.Files, &pb.FileMeta{Id: f.ID, Name: f.Name, S3Key: f.S3Key, Size: f.Size})
	}
	return resp, nil
}

func (s *FileService) RenameFolder(ctx context.Context, req *pb.RenameFolderRequest) (*pb.Folder, error) {
	po, err := s.folderUC.RenameFolder(ctx, req.Id, req.Name, req.OwnerId)
	if err != nil {
		return nil, err
	}
	return &pb.Folder{Id: po.ID, Name: po.Name, OwnerId: po.OwnerID}, nil
}

func (s *FileService) DeleteFolder(ctx context.Context, req *pb.DeleteFolderRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.folderUC.DeleteFolder(ctx, req.Id, req.OwnerId)
}

func (s *FileService) UploadToFolder(ctx context.Context, req *pb.UploadToFolderRequest) (*pb.FileMeta, error) {
	po, err := s.folderUC.UploadToFolder(ctx, req.FolderId, req.FileName, req.Data, req.ContentType, req.OwnerId)
	if err != nil {
		return nil, err
	}
	return &pb.FileMeta{Id: po.ID, FolderId: po.FolderID, Name: po.Name, S3Key: po.S3Key, S3Bucket: po.S3Bucket, Size: po.Size, ContentType: po.ContentType}, nil
}

func (s *FileService) MoveFile(ctx context.Context, req *pb.MoveFileRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.folderUC.MoveFile(ctx, req.Id, req.TargetFolderId, req.OwnerId)
}

// === 分享操作 ===

func (s *FileService) CreateShare(ctx context.Context, req *pb.CreateShareRequest) (*pb.ShareLink, error) {
	po, err := s.shareUC.CreateShare(ctx, req.ResourceType, req.ResourceId, req.Password, nil, nil, req.CreatedBy)
	if err != nil {
		return nil, err
	}
	resp := &pb.ShareLink{Id: po.ID, Token: po.Token, ResourceType: po.ResourceType, ResourceId: po.ResourceID, CreatedBy: po.CreatedBy}
	if po.Password != nil {
		resp.Password = *po.Password
	}
	return resp, nil
}

func (s *FileService) DeleteShare(ctx context.Context, req *pb.DeleteShareRequest) (*pb.Empty, error) {
	return &pb.Empty{}, s.shareUC.DeleteShare(ctx, req.Id, req.CreatedBy)
}

func (s *FileService) GetShareInfo(ctx context.Context, req *pb.GetShareInfoRequest) (*pb.ShareInfo, error) {
	share, file, err := s.shareUC.GetShareInfo(ctx, req.Token)
	if err != nil {
		return nil, err
	}
	resp := &pb.ShareInfo{Token: share.Token, ResourceType: share.ResourceType, FileName: file.Name, FileSize: file.Size, HasPassword: share.Password != nil}
	if share.ExpiresAt != nil {
		resp.ExpiresAt = share.ExpiresAt.Format("2006-01-02T15:04:05Z")
	}
	return resp, nil
}

func (s *FileService) DownloadShare(ctx context.Context, req *pb.DownloadShareRequest) (*pb.DownloadShareResponse, error) {
	url, fileName, err := s.shareUC.DownloadShare(ctx, req.Token)
	if err != nil {
		return nil, err
	}
	return &pb.DownloadShareResponse{PresignedUrl: url, FileName: fileName}, nil
}

// 辅助函数
func stringValue(s *string) string {
	if s == nil {
		return ""
	}
	return *s
}

注意:需要 import "io" 在文件头部。

  • Step 3: Commit
git add -A
git commit -m "feat: add service layer implementing proto FileService interface"

Task 8: 创建 server 层HTTP + gRPC

Files:

  • Create: internal/server/server.go

  • Create: internal/server/http.go

  • Create: internal/server/grpc.go

  • Step 1: 创建 server.go — ProviderSet

package server

import "github.com/google/wire"

var ProviderSet = wire.NewSet(
	NewHTTPServer,
	NewGRPCServer,
)
  • Step 2: 创建 http.go — Kratos HTTP server
package server

import (
	v1 "rag/file-system/api/file/v1"
	"rag/file-system/internal/conf"
	"rag/file-system/internal/service"

	"github.com/go-kratos/kratos/v2/log"
	"github.com/go-kratos/kratos/v2/middleware/logging"
	"github.com/go-kratos/kratos/v2/middleware/recovery"
	"github.com/go-kratos/kratos/v2/middleware/tracing"
	"github.com/go-kratos/kratos/v2/transport/http"
)

func NewHTTPServer(c *conf.Server, svc *service.FileService, logger log.Logger) *http.Server {
	opts := []http.ServerOption{
		http.Middleware(
			recovery.Recovery(),
			tracing.Server(),
			logging.Server(logger),
		),
	}
	if c.Http.Addr != "" {
		opts = append(opts, http.Address(c.Http.Addr))
	}
	if c.Http.Timeout != nil {
		opts = append(opts, http.Timeout(c.Http.Timeout.AsDuration()))
	}
	srv := http.NewServer(opts...)
	v1.RegisterFileServiceHTTPServer(srv, svc)

	// 文件上传逃生门multipart upload 的二进制数据不适合走 proto
	// 这些路由直接使用原生 HTTP handler
	srv.HandleFunc("/files/upload/binary", svc.HandleBinaryUpload)

	return srv
}
  • Step 3: 创建 grpc.go — Kratos gRPC server
package server

import (
	v1 "rag/file-system/api/file/v1"
	"rag/file-system/internal/conf"
	"rag/file-system/internal/service"

	"github.com/go-kratos/kratos/v2/log"
	"github.com/go-kratos/kratos/v2/middleware/logging"
	"github.com/go-kratos/kratos/v2/middleware/recovery"
	"github.com/go-kratos/kratos/v2/middleware/tracing"
	"github.com/go-kratos/kratos/v2/transport/grpc"
)

func NewGRPCServer(c *conf.Server, svc *service.FileService, logger log.Logger) *grpc.Server {
	opts := []grpc.ServerOption{
		grpc.Middleware(
			recovery.Recovery(),
			tracing.Server(),
			logging.Server(logger),
		),
	}
	if c.Grpc.Addr != "" {
		opts = append(opts, grpc.Address(c.Grpc.Addr))
	}
	if c.Grpc.Timeout != nil {
		opts = append(opts, grpc.Timeout(c.Grpc.Timeout.AsDuration()))
	}
	srv := grpc.NewServer(opts...)
	v1.RegisterFileServiceServer(srv, svc)
	return srv
}
  • Step 4: Commit
git add -A
git commit -m "feat: add server layer with HTTP and gRPC transport"

Task 9: 创建 Watermill CQRS 设置

Files:

  • Create: internal/watermark/cqrs_setup.go

  • Create: internal/watermark/commands.go

  • Create: internal/watermark/events.go

  • Create: internal/watermark/handlers.go

  • Step 1: 创建 commands.go — Command 结构体

package watermark

import "io"

// 文件操作 Commands
type UploadFileCommand struct {
	BucketName string
	ObjectKey  string
	Data       io.Reader
}

type DeleteFileCommand struct {
	BucketName string
	ObjectKey  string
}

// 分片上传 Commands
type InitMultipartCommand struct {
	BucketName string
	ObjectKey  string
}

type UploadPartCommand struct {
	BucketName string
	ObjectKey  string
	UploadID   string
	PartNumber int32
	Data       io.Reader
}

type CompleteMultipartCommand struct {
	BucketName string
	ObjectKey  string
	UploadID   string
	Parts      []CompletedPart
}

type AbortMultipartCommand struct {
	BucketName string
	ObjectKey  string
	UploadID   string
}

// 桶操作 Commands
type CreateBucketCommand struct {
	Name string
}

type DeleteBucketCommand struct {
	Name string
}

// 文件夹操作 Commands
type CreateFolderCommand struct {
	ParentID *string
	Name     string
	OwnerID  string
}

type RenameFolderCommand struct {
	ID      string
	Name    string
	OwnerID string
}

type DeleteFolderCommand struct {
	ID      string
	OwnerID string
}

type UploadToFolderCommand struct {
	FolderID    string
	FileName    string
	Data        []byte
	ContentType string
	OwnerID     string
}

type MoveFileCommand struct {
	FileID         string
	TargetFolderID string
	OwnerID        string
}

// 分享操作 Commands
type CreateShareCommand struct {
	ResourceType string
	ResourceID   string
	Password     string
	MaxDownloads *int
	CreatedBy    string
}

type DeleteShareCommand struct {
	ID         string
	CreatedBy  string
}

// 共享结构体
type CompletedPart struct {
	PartNumber int32
	ETag       string
}
  • Step 2: 创建 events.go — Event 结构体
package watermark

// 文件事件
type FileUploadedEvent struct {
	BucketName string
	ObjectKey  string
	Size       int64
}

type FileDeletedEvent struct {
	BucketName string
	ObjectKey  string
}

// 桶事件
type BucketCreatedEvent struct {
	Name string
}

type BucketDeletedEvent struct {
	Name string
}

// 文件夹事件
type FolderCreatedEvent struct {
	FolderID string
	Name     string
	OwnerID  string
}

type FolderDeletedEvent struct {
	FolderID string
	OwnerID  string
}

// 分享事件
type ShareCreatedEvent struct {
	ShareID      string
	ResourceType string
	ResourceID   string
	Token        string
	CreatedBy    string
}

type ShareDownloadedEvent struct {
	Token string
}
  • Step 3: 创建 cqrs_setup.go — CommandBus + EventBus 初始化
package watermark

import (
	"database/sql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/sqladapter"
	"github.com/ThreeDotsLabs/watermill/components/cqrs"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/go-kratos/kratos/v2/log"
)

type CQRSBus struct {
	CommandBus *cqrs.CommandBus
	EventBus   *cqrs.EventBus
	Router     *message.Router
}

func NewCQRSBus(db *sql.DB, logger log.Logger) (*CQRSBus, error) {
	helper := log.NewHelper(logger)
	wmLogger := watermill.NewStdLogger(false, false)

	pubSub, err := sqladapter.NewSQL(db, sqladapter.Config{
		Schema:         "watermill",
		AutoInitialize: true,
	}, wmLogger)
	if err != nil {
		return nil, err
	}

	router, err := message.NewRouter(message.RouterConfig{}, wmLogger)
	if err != nil {
		return nil, err
	}

	cqrsMarshaler := cqrs.JSONMarshaler{}

	commandBus, err := cqrs.NewCommandBusWithConfig(pubSub, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			return "commands." + params.CommandName, nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    wmLogger,
	})
	if err != nil {
		return nil, err
	}

	eventBus, err := cqrs.NewEventBusWithConfig(pubSub, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			return "events." + params.EventName, nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    wmLogger,
	})
	if err != nil {
		return nil, err
	}

	helper.Info("Watermill CQRS bus initialized with PostgreSQL")
	return &CQRSBus{
		CommandBus: commandBus,
		EventBus:   eventBus,
		Router:     router,
	}, nil
}
  • Step 4: 创建 handlers.go — CQRS Command/Event handlers
package watermark

import (
	"context"

	"github.com/ThreeDotsLabs/watermill/components/cqrs"
	"github.com/go-kratos/kratos/v2/log"
)

// RegisterHandlers 注册所有 Command 和 Event handler 到 router
func (b *CQRSBus) RegisterHandlers(processor *cqrs.CommandProcessor, eventProcessor *cqrs.EventProcessor) {
	// Handlers 会在后续 Task 中实现
	// 当前先注册空壳,确保编译通过
}

// CQRSHandler 持有 biz 层 usecase 引用,用于处理 CQRS commands
type CQRSHandler struct {
	// 后续 Task 注入 usecase
	log *log.Helper
}

func NewCQRSHandler(logger log.Logger) *CQRSHandler {
	return &CQRSHandler{log: log.NewHelper(logger)}
}
  • Step 5: Commit
git add -A
git commit -m "feat: add Watermill CQRS setup with commands, events, and handler stubs"

Task 10: Wire 依赖注入 + 入口

Files:

  • Create: cmd/server/wire.go

  • Modify: cmd/server/main.go

  • Step 1: 创建 wire.go

//go:build wireinject

package main

import (
	"rag/file-system/internal/biz"
	"rag/file-system/internal/conf"
	"rag/file-system/internal/data"
	"rag/file-system/internal/server"
	"rag/file-system/internal/service"

	"github.com/go-kratos/kratos/v2"
	"github.com/go-kratos/kratos/v2/config"
	"github.com/go-kratos/kratos/v2/config/file"
	"github.com/go-kratos/kratos/v2/log"
	"github.com/google/wire"
)

func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App {
	return kratos.New(
		kratos.Name("file-system"),
		kratos.Logger(logger),
		kratos.Server(hs, gs),
	)
}

func initApp(*conf.Bootstrap, log.Logger) (*kratos.App, func(), error) {
	panic(wire.Build(
		data.ProviderSet,
		biz.ProviderSet,
		service.ProviderSet,
		server.ProviderSet,
		newApp,
	))
}
  • Step 2: 重写 main.go
package main

import (
	"flag"
	"os"

	"rag/file-system/internal/conf"

	"github.com/go-kratos/kratos/v2"
	"github.com/go-kratos/kratos/v2/config"
	fileconfig "github.com/go-kratos/kratos/v2/config/file"
	"github.com/go-kratos/kratos/v2/log"
	"github.com/go-kratos/kratos/v2/transport/grpc"
	"github.com/go-kratos/kratos/v2/transport/http"
)

var (
	flagconf string
)

func init() {
	flag.StringVar(&flagconf, "conf", "configs/config.yaml", "config path, eg: -conf config.yaml")
}

func main() {
	flag.Parse()

	logger := log.With(log.NewStdLogger(os.Stdout),
		"ts", log.DefaultTimestamp,
		"caller", log.DefaultCaller,
		"service.kind", "file-system",
	)

	c := config.New(
		config.WithSource(
			fileconfig.NewSource(flagconf),
		),
	)
	if err := c.Load(); err != nil {
		panic(err)
	}

	var bc conf.Bootstrap
	if err := c.Scan(&bc); err != nil {
		panic(err)
	}

	app, cleanup, err := initApp(&bc, logger)
	if err != nil {
		panic(err)
	}
	defer cleanup()

	if err := app.Run(); err != nil {
		panic(err)
	}
}
  • Step 3: 运行 wire 生成
cd /Users/wen/project/rag/file-system/cmd/server && wire

Expected: 生成 wire_gen.go

  • Step 4: 构建
cd /Users/wen/project/rag/file-system && go build ./cmd/server

Expected: 编译成功(可能需要调整 import 和类型匹配)

  • Step 5: Commit
git add -A
git commit -m "feat: add Wire DI and rewrite main.go as Kratos app entry point"

Task 11: 更新 Dockerfile 和 docker-compose.yml

Files:

  • Modify: Dockerfile

  • Modify: docker-compose.yml

  • Step 1: 更新 Dockerfile

FROM golang:1.25-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /bin/file-system ./cmd/server

FROM alpine:latest
RUN apk --no-cache add ca-certificates
COPY --from=builder /bin/file-system /bin/file-system
COPY configs/config.yaml /app/configs/config.yaml
WORKDIR /app
EXPOSE 8080 9000
CMD ["/bin/file-system", "-conf", "configs/config.yaml"]
  • Step 2: 更新 docker-compose.yml
version: '3.8'
services:
  file-system:
    build: .
    ports:
      - "8080:8080"
      - "9000:9000"
    environment:
      - RUSTFS_ACCESS_KEY_ID=${RUSTFS_ACCESS_KEY_ID}
      - RUSTFS_SECRET_ACCESS_KEY=${RUSTFS_SECRET_ACCESS_KEY}
    volumes:
      - ./configs/config.yaml:/app/configs/config.yaml
    depends_on:
      - postgres
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: file_system
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "5432:5432"
  • Step 3: Commit
git add Dockerfile docker-compose.yml
git commit -m "chore: update Dockerfile and docker-compose for Kratos binary"

Task 12: 更新 CLAUDE.md

Files:

  • Modify: CLAUDE.md

  • Step 1: 重写 CLAUDE.md 反映新架构

# CLAUDE.md

> **Cross-repo rules** — see `/Users/wen/project/rag/CLAUDE.md` for full workspace conventions.

## Build & Run

```bash
go run ./cmd/server -conf configs/config.yaml     # HTTP :8080, gRPC :9000
make api                                           # 重新生成 proto 代码
make wire                                          # 重新生成 Wire DI
go test ./...                                      # 所有测试
buf generate                                       # 重新生成 proto

Architecture

Go 1.25 microservice. Kratos 框架 + DDD 四层 + Watermill CQRS。

cmd/server/main.go          → 入口,加载配置
cmd/server/wire.go          → Wire DI 声明
cmd/server/wire_gen.go      → Wire 生成代码
api/file/v1/                → Proto 定义HTTP+gRPC 双协议)
internal/conf/              → 配置结构proto 定义)
internal/biz/               → 业务逻辑层repo 接口定义 + usecase
internal/data/              → 数据访问层GORM + S3实现 biz repo 接口)
internal/service/           → 服务实现层(实现 proto Service 接口)
internal/server/            → HTTP/gRPC server 创建和中间件
internal/watermark/         → Watermill CQRSCommandBus + EventBus
internal/pkg/sanitize/      → 输入净化工具
internal/pkg/s3errors/      → S3 错误映射
configs/config.yaml         → 本地开发配置

层间调用

service (DTO 转换) → biz (业务逻辑) → data (数据访问)
                                ↕
                    watermark (CQRS commands/events)

技术栈

  • Kratos — HTTP + gRPC 框架proto-first API 定义
  • Wire — 编译期依赖注入
  • Watermill — CQRSCommandBus + EventBusPGSQL 作为消息存储
  • GORM — 业务数据 ORMfolders, files, share_links
  • AWS SDK v2 — S3 对接 RustFS/MinIO
  • PostgreSQL — 业务数据 + Watermill 消息队列

Code Patterns

Wire ProviderSet 模式

每层定义 ProviderSet

// internal/data/data.go
var ProviderSet = wire.NewSet(NewData, NewFileRepo, NewFolderRepo, NewFileMetaRepo, NewShareRepo)

// internal/biz/biz.go
var ProviderSet = wire.NewSet(NewFileUsecase, NewBucketUsecase, NewFolderUsecase, NewShareUsecase)

// internal/service/service.go
var ProviderSet = wire.NewSet(NewFileService)

// internal/server/server.go
var ProviderSet = wire.NewSet(NewHTTPServer, NewGRPCServer)

GORM 事务管理

// biz 层定义接口
type Transaction interface { InTx(ctx context.Context, fn func(ctx context.Context) error) error }

// data 层实现
func (d *Data) DB(ctx context.Context) *gorm.DB {
    tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB)
    if ok { return tx }
    return d.db.WithContext(ctx)
}

Proto Error 定义

enum ErrorReason {
  FILE_NOT_FOUND = 0 [(errors.code) = 404];
  INVALID_PARAMETER = 1 [(errors.code) = 400];
}

使用:return nil, api.file.v1.ErrorFileNotFound("file %s not found", key)

GORM 模型命名

  • 模型名以 PO 后缀:FolderPO, FileMetaPO, ShareLinkPO
  • 表名用 TableName() 方法指定 snake_case

配置

configs/config.yaml 本地开发用。环境变量通过 ${ENV_VAR} 占位符支持。


- [ ] **Step 2: Commit**

```bash
git add CLAUDE.md
git commit -m "docs: rewrite CLAUDE.md for Kratos+Watermill architecture"

自查清单

检查项 状态
所有 25 个端点都有对应的 proto RPC Task 2
S3 操作全部保留12 个方法) Task 5 data/file_repo.go
PostgreSQL 3 张表用 GORM 管理 Task 5 data/data.go
文件夹递归删除+ S3 清理 Task 6 biz/folder.go
分享链接密码校验、过期检查 Task 6 biz/share.go
输入净化object key、bucket name、filename Task 4 pkg/sanitize
S3 错误映射到业务错误 Task 4 pkg/s3errors
Wire DI 自动组装 Task 10
HTTP + gRPC 双协议 Task 8
Kratos 内置中间件recovery, tracing, logging Task 8
配置从环境变量迁移到 YAML + 环境变量占位符 Task 1 + 10
Watermill CQRS 骨架 Task 9
无 placeholder所有代码都是完整的