@Lumilio-Photos: WAL (Wait-Ahead Logging)
参考项目 流明集
在此后端应用中,API 层只作为接受文件并落盘的操作,负责创建任务,这里的任务通常是复杂的、time-consuming 的。对此,我们可以将任务分成几类,或者可以说是为任务的类型进行一个封装。让我们来看 Task model 的具体实现。
type TaskType string
// 为任务类型创建封装,用字符串会有歧义
const (
TaskTypeUpload TaskType = "UPLOAD"
TaskTypeScan TaskType = "SCAN"
TaskTypeProcess TaskType = "PROCESS"
TaskTypeIndex TaskType = "INDEX"
// ...方便以后添加扩展
)
type Task struct {
TaskID string `json:"taskId"`
Type TaskType `json:"type"` // 任务类型
ClientHash string `json:"clientHash"`
StagedPath string `json:"stagedPath"`
UserID string `json:"userId"`
Timestamp time.Time `json:"timestamp"`
ContentType string `json:"contentType,omitempty"`
FileName string `json:"fileName,omitempty"`
}
这样子,我们就能轻松明了地在 API 层创建任务。例如,将上传任务标记为 TaskTypeUpload。
task := queue.Task{
TaskID: uuid.New().String(),
Type: queue.TaskTypeUpload,
ClientHash: clientHash,
StagedPath: stagingFilePath,
UserID: userID,
Timestamp: time.Now(),
ContentType: header.Header.Get("Content-Type"),
FileName: header.Filename,
}
要深入了解 Write-Ahead Log 的文件持久化队列 TaskQueue 在内存与磁盘上如何配合工作的,我们先要了解为什么我们需要这个架构。
- 持久化:我们在创建一个任务时,并不是在内存中定义一个变量、结构体或者指针,而是直接在磁盘下写下
tasks.wal这个特定的文件。这对该应用作为 NAS 照片管理应用的意义巨大。WAL 确保了每个任务先落盘,再消费;如果进程宕机、重启,worker 仍然能够继续进行任务。Redis 本身是内存型的,虽然也能通过 RDB/AOF 做持久化,但配置复杂度更高,一旦持久化策略没调好,还是可能丢数据或恢复不及时。 - 维护简单,降低依赖:我们不需要额外部署 Redis 或消息中间件,维护简单,在容器化环境下更好管理。这里也是因为我已经有 4 个容器了,我希望克制容器的使用,这有利于保证程序的轻量化设计。
- 内存/磁盘资源取舍:对于大规模的上传高并发场景,例如 10000 张 RAW 照片,Redis 需要保留整个任务队列,会吃大量 RAM。文件 WAL 把历史队列压在磁盘上,这里也是考虑到 NAS 作为高存储、低边缘配置的设备,对 RAM 和磁盘利用率的取舍。
我们还需要了解整个 WAL 的运作过程。
我们在上述代码定义了入队单元,也就是 Task 任务模型,其是我们对任务元数据的封装。接着我们定义 TaskQueue 模型。任务队列会维护两个文件:
tasks.wal:所有未完成任务的追加日志(append-only)tasks.done:已完成任务的 ID 表
在内存中维护了一个带缓冲的 chan Task,大小由 bufferSize 决定。
type TaskQueue struct {
queueDir string
walFile string
doneFile string
tasks chan Task
mutex sync.Mutex
bufferSize int
initialized bool
}
首先,我们将经历初始化。当调用 Initialize() 时,会:
os.MkdirAll确保队列目录存在- 创建(如果不存在)
tasks.wal与tasks.done - 从
tasks.done加载已完成的TaskID到 map - 从
tasks.wal扫描所有未完成的任务,非阻塞地放入taskschannel - 开一个后台 goroutine 周期性地
watchForNewTasks,每秒检查 WAL 文件增长,并将新增行 enqueue
func (q *TaskQueue) Initialize() error {
q.mutex.Lock(); defer q.mutex.Unlock()
if q.initialized { return nil }
// 确保 WAL、done 文件存在
completedTasks, _ := q.loadCompletedTaskIDs()
_ = q.loadExistingTasks(completedTasks)
go q.watchForNewTasks(completedTasks)
q.initialized = true
return nil
}
接着,我们需要将 API 传入的 Task 入队,大致代码如下:
func (q *TaskQueue) EnqueueTask(task Task) error {
q.mutex.Lock(); defer q.mutex.Unlock()
// 1) 设置时间戳
// 2) JSON 序列化
// 3) 以追加方式写入 tasks.wal(每行一个 Task JSON)
file, err := os.OpenFile(q.walFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
defer file.Close()
_, err = file.WriteString(string(taskJSON) + "\n")
return err
}
只要有多 goroutine 同时往 WAL(tasks.wal)里写,就必须加锁保证写入原子性,否则两条 JSON 可能会互相“插队”,导致日志文件损坏、后续反序列化失败。
然后 worker 端的主循环会反复调用 GetTask() 拿到一个 Task,并根据其 Type 分发处理。
// GetTask 从 channel 拿任务,channel 里没有则阻塞;返回 ok=false 表示队列已关闭
func (q *TaskQueue) GetTask() (Task, bool) {
task, ok := <-q.tasks
return task, ok
}
最后,我们需要标记完成和定时清理。
func (q *TaskQueue) MarkTaskComplete(taskID string) error {
// 将 taskID 追加到 tasks.done
}
定期例如每日一次调用 CleanupProcessedTasks() 会:
- 读取
tasks.done构建已完成 ID 的集合 - 扫描
tasks.wal,把仍未完成的任务复制到一个临时文件 - 用临时文件替换原
tasks.wal,并清空tasks.done
这样保证 WAL 只保留未完成的任务。
持续监控新的任务:
func (q *TaskQueue) watchForNewTasks(completedTasks map[string]bool) {
lastSize, _ := q.getFileSize(q.walFile)
ticker := time.NewTicker(1*time.Second)
for range ticker.C {
currentSize, _ := q.getFileSize(q.walFile)
if currentSize > lastSize {
// Seek 到 lastSize,读取新增行,反序列化后非阻塞 enqueue
lastSize = currentSize
}
// 并定期刷新 completedTasks map
}
}
保证一旦有新的 EnqueueTask 写入 WAL,就能立刻送到 q.tasks channel,触发消费。