工程
用 SQLite 做轻量任务队列:锁、重试与幂等
SQLite 可以承担内容站和小型系统里的轻量后台任务。本文用一张 jobs 表讲清原子领取、租约校验、重试、死信、锁回收与 worker 数量的取舍。
很多小系统需要后台任务,但不一定需要一套消息中间件。内容站要刷新 sitemap、同步搜索索引、生成缩略图;内部工具要补发 Webhook、重算报表、清理过期数据。这些任务的共同点是:量不大,但必须可恢复、可重试、能看到失败原因。
SQLite 可以承担这类轻量任务队列。它的价值不是吞吐神话,而是把状态、业务数据和任务记录放在同一个事务边界里。少一个服务,少一套备份和监控,小团队会轻很多。但边界也要说清:SQLite 同一时间只有一个写事务提交,没有 SELECT FOR UPDATE SKIP LOCKED,不适合跨多台机器做复杂消费组。
用表结构固定状态机
队列不要只靠内存状态。任务至少需要四个状态:queued 等待执行,running 已被 worker 领取,done 成功,dead 重试耗尽。
CREATE TABLE jobs (
id INTEGER PRIMARY KEY,
type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued'
CHECK (status IN ('queued', 'running', 'done', 'dead')),
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
run_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
locked_by TEXT,
locked_at TEXT,
last_error TEXT,
idempotency_key TEXT UNIQUE,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_jobs_pick ON jobs(status, run_at, id);
CREATE INDEX idx_jobs_locked ON jobs(status, locked_at);
payload 前期用 JSON 字符串就够了,等任务类型稳定后再拆列。idempotency_key 用来防止同一业务事件重复入队,例如 post:42:refresh-index。入队时不要先查再插,直接让唯一约束处理并发:
INSERT INTO jobs(type, payload, idempotency_key)
VALUES (?, ?, ?)
ON CONFLICT(idempotency_key) DO NOTHING;
领取任务必须原子化
最常见的错误是先 SELECT id,再 UPDATE status='running'。两个 worker 可能读到同一行,最后谁成功取决于竞态。SQLite 3.35 及以上更稳的写法是用一次 UPDATE ... RETURNING 完成领取:
WITH next AS (
SELECT id
FROM jobs
WHERE status = 'queued'
AND run_at <= CURRENT_TIMESTAMP
ORDER BY run_at, id
LIMIT 1
)
UPDATE jobs
SET status = 'running',
locked_by = ?,
locked_at = CURRENT_TIMESTAMP,
attempts = attempts + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = (SELECT id FROM next)
AND status = 'queued'
AND run_at <= CURRENT_TIMESTAMP
RETURNING id, type, payload, attempts, max_attempts;
外层再写一遍 status 和 run_at 不是装饰。连接等待写锁时,候选行可能已经被别的 worker 改过;外层条件能让过期候选影响 0 行,而不是把别人的任务重新标成 running。locked_by 也不要只写固定字符串,最好用进程唯一值,例如 hostname:pid:random。
Go 侧只需要把没有任务和真实错误分开:
func Claim(ctx context.Context, db *sql.DB, worker string) (*Job, error) {
var j Job
err := db.QueryRowContext(ctx, claimSQL, worker).Scan(
&j.ID, &j.Type, &j.Payload, &j.Attempts, &j.MaxAttempts,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &j, nil
}
领取成功后要立刻结束事务,真正的业务处理放在事务外。不要在写事务里请求第三方接口、生成图片或发邮件。SQLite 的写锁很贵,持锁时间越短,前台页面越不容易被后台任务拖慢。
实际 worker 循环也不必复杂。没有任务时短暂休眠,有任务时按类型分发,处理结果只通过 done 或 fail 写回数据库。这样重启进程不会丢状态,部署新版本也不需要单独迁移队列服务。
func RunWorker(ctx context.Context, db *sql.DB, name string) error {
for ctx.Err() == nil {
job, err := Claim(ctx, db, name)
if err != nil {
return err
}
if job == nil {
time.Sleep(500 * time.Millisecond)
continue
}
if err := Handle(ctx, job); err != nil {
if err := Fail(ctx, db, job.ID, name, job.Attempts, job.MaxAttempts, err.Error()); err != nil {
return err
}
continue
}
res, err := db.ExecContext(ctx, `
UPDATE jobs
SET status = 'done',
locked_by = NULL,
locked_at = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
AND status = 'running'
AND locked_by = ?
AND attempts = ?`, job.ID, name, job.Attempts)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
continue
}
}
return ctx.Err()
}
这里用 locked_by + attempts 做租约校验。回收逻辑可能把超时的 running 任务放回 queued,如果旧 worker 此时才完成,更新语句应该影响 0 行,而不是把新 worker 刚领取的任务标成 done 或 dead。业务处理失败会通过 Fail 转成重试;写回失败是队列自身异常,应该暴露给监控或 supervisor,而不是让当前进程在半处理状态里假装成功。
重试是主路径
后台任务默认就是至少一次执行。worker 可能崩溃,HTTP 请求可能超时,任务也可能因为锁过期而被重新领取。即使用租约保护,业务动作也可能已经对外部系统生效后才写回失败。因此 handler 必须幂等:刷新索引可以按文章 ID 覆盖写;发送 Webhook 要带业务侧去重键;生成文件要写临时文件后再原子替换。
成功时标记 done,失败时根据次数决定重试或进入 dead:
func Fail(ctx context.Context, db *sql.DB, id int64, worker string, attempts, max int, msg string) error {
status := "queued"
delay := time.Duration(attempts*attempts) * time.Minute
if attempts >= max {
status = "dead"
delay = 0
}
res, err := db.ExecContext(ctx, `
UPDATE jobs
SET status = ?,
run_at = datetime('now', ?),
locked_by = NULL,
locked_at = NULL,
last_error = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
AND status = 'running'
AND locked_by = ?
AND attempts = ?;`,
status, fmt.Sprintf("+%d seconds", int(delay.Seconds())), msg, id, worker, attempts)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
return nil
}
return nil
}
last_error 不要省。没有它,审核后台只能看到失败数量,排查时还要翻日志;有了它,认证过期、格式错误、远端 429 会直接浮出来。
dead 也不要急着删除。它不是垃圾状态,而是人工介入入口。后台可以提供两个动作:查看 payload 和错误后重新入队,或者确认无价值后归档。重新入队只需要把 status 改回 queued,清空锁字段,并按需要把 attempts 归零。这样一次外部接口配置错误不会把任务永久丢掉,也不会靠手写 SQL 在生产库里临时修补。
成功任务可以保留一段时间再清理。比如内容站保留 7 天,内部结算任务保留 90 天。保留期不是越长越好,因为队列表会持续写入,索引也会变大。更稳的做法是定时删除足够旧的 done 记录,只长期保留 dead 和少量近期样本。小系统的审计价值和数据库体积,需要在这里明确取舍。
回收卡住的任务
进程退出时不一定能执行清理钩子,所以需要一个定时回收逻辑,把长时间停在 running 的任务放回队列:
UPDATE jobs
SET status = 'queued',
locked_by = NULL,
locked_at = NULL,
run_at = CURRENT_TIMESTAMP,
last_error = 'worker lock timeout',
updated_at = CURRENT_TIMESTAMP
WHERE status = 'running'
AND locked_at < datetime('now', '-10 minutes');
超时时间要按任务类型估算。刷新 sitemap 可以短一点,批量导入图片要长一点。简单系统可以先用固定值,等任务类型变多后再加 timeout_seconds 字段。
配置和 worker 数量要克制
启动时建议明确设置:
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = 5000;
WAL 让读请求不必频繁等待写事务;busy_timeout 给短暂锁冲突一点缓冲;synchronous=NORMAL 是许多 Web 应用在性能和持久性之间的合理折中。数据库文件不要放在网络盘上,备份也要走 SQLite 的备份接口或 VACUUM INTO,不要运行中只复制主库文件。
worker 数量不是越多越好。SQLite 写入最终仍会串行,十几个 worker 往往只是在制造锁竞争。对内容站来说,一个领取循环加少量业务 goroutine 通常足够。如果任务主要是外部 I/O,可以在业务层并发;如果任务主要是更新同一个 SQLite 文件,并发只会把等待时间搬到锁上。
可观测性也要落在表上。后台页面至少显示 queued 数量、最老 run_at、dead 数量和最近的 last_error。这四个值比一串 worker 日志更有用:queued 持续增长说明消费能力不够;最老任务越来越旧说明有任务饿死;dead 增长说明外部依赖或数据格式出了问题。小系统不必一开始接入复杂监控,但不能没有一眼看懂的健康信号。
真正需要 RabbitMQ、Redis Stream 或 Kafka 的信号也很清楚:跨多机公平消费、百万级延迟任务、复杂优先级、消费组语义,或者你已经在为锁竞争写大量补丁。
SQLite 任务队列的目标是让小系统保持小。当一张表、一组索引、一次原子领取和一套重试策略能讲清问题,它就是合适的工具;当例外规则比主流程还多,就该把队列从数据库里拆出去。