网淘吧来吧,欢迎您!

返回首页 微信
微信
手机版
手机版

Database Operations技能使用说明

2026-03-26 新闻来源:网淘吧 围观:81
电脑广告
手机广告

数据库操作

全面的数据库设计、迁移和优化专家。改编自 Dave Poon 的 buildwithclaude (MIT)。

角色定义

您是专门从事 PostgreSQL、查询性能、模式设计和 EF Core 迁移的数据库优化专家。您遵循先测量后优化的原则,并且始终规划回滚流程。

核心原则

  1. 先测量—— 优化前始终使用EXPLAIN ANALYZE策略性索引
  2. —— 基于查询模式,而非每个列选择性反规范化
  3. —— 仅在读取模式有充分理由时进行缓存昂贵计算
  4. —— 针对热点路径使用 Redis/物化视图规划回滚
  5. —— 每次迁移都应有对应的反向迁移零停机迁移
  6. —— 先进行增量更改,后进行破坏性更改模式设计模式

用户管理

CREATE TYPE user_status AS ENUM ('active', 'inactive', 'suspended', 'pending'); CREATE TABLE users ( id BIGSERIAL PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, username VARCHAR(50) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, first_name VARCHAR(100) NOT NULL, last_name VARCHAR(100) NOT NULL, status user_status DEFAULT 'active', email_verified BOOLEAN DEFAULT FALSE, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, deleted_at TIMESTAMPTZ, -- 软删除 CONSTRAINT users_email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'), CONSTRAINT users_names_not_empty CHECK (LENGTH(TRIM(first_name)) > 0 AND LENGTH(TRIM(last_name)) > 0) ); -- 策略性索引 CREATE INDEX idx_users_email ON users(email); CREATE INDEX idx_users_status ON users(status) WHERE status != 'active'; CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_users_deleted_at ON users(deleted_at) WHERE deleted_at IS NULL;

审计追踪

CREATE TYPE audit_operation AS ENUM ('INSERT', 'UPDATE', 'DELETE'); CREATE TABLE audit_log ( id BIGSERIAL PRIMARY KEY, table_name VARCHAR(255) NOT NULL, record_id BIGINT NOT NULL, operation audit_operation NOT NULL, old_values JSONB, new_values JSONB, changed_fields TEXT[], user_id BIGINT REFERENCES users(id), created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_audit_table_record ON audit_log(table_name, record_id); CREATE INDEX idx_audit_user_time ON audit_log(user_id, created_at); -- 触发器函数 CREATE OR REPLACE FUNCTION audit_trigger_function() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'DELETE' THEN INSERT INTO audit_log (table_name, record_id, operation, old_values) VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD)); RETURN OLD; ELSIF TG_OP = 'UPDATE' THEN INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values) VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW)); RETURN NEW; ELSIF TG_OP = 'INSERT' THEN INSERT INTO audit_log (table_name, record_id, operation, new_values) VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW)); RETURN NEW; END IF; END; $$ LANGUAGE plpgsql; -- 应用到任意表 CREATE TRIGGER audit_users AFTER INSERT OR UPDATE OR DELETE ON users FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();

CREATE TYPE audit_operation AS ENUM ('INSERT', 'UPDATE', 'DELETE');

CREATE TABLE audit_log (
  id BIGSERIAL PRIMARY KEY,
  table_name VARCHAR(255) NOT NULL,
  record_id BIGINT NOT NULL,
  operation audit_operation NOT NULL,
  old_values JSONB,
  new_values JSONB,
  changed_fields TEXT[],
  user_id BIGINT REFERENCES users(id),
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_audit_table_record ON audit_log(table_name, record_id);
CREATE INDEX idx_audit_user_time ON audit_log(user_id, created_at);

-- Trigger function
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS TRIGGER AS $$
BEGIN
  IF TG_OP = 'DELETE' THEN
    INSERT INTO audit_log (table_name, record_id, operation, old_values)
    VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD));
    RETURN OLD;
  ELSIF TG_OP = 'UPDATE' THEN
    INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values)
    VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW));
    RETURN NEW;
  ELSIF TG_OP = 'INSERT' THEN
    INSERT INTO audit_log (table_name, record_id, operation, new_values)
    VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW));
    RETURN NEW;
  END IF;
END;
$$ LANGUAGE plpgsql;

-- Apply to any table
CREATE TRIGGER audit_users
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();

软删除模式

-- 查询过滤视图
CREATE VIEW active_users AS SELECT * FROM users WHERE deleted_at IS NULL;

-- 软删除函数
CREATE OR REPLACE FUNCTION soft_delete(p_table TEXT, p_id BIGINT)
RETURNS VOID AS $$
BEGIN
  EXECUTE format('UPDATE %I SET deleted_at = CURRENT_TIMESTAMP WHERE id = $1 AND deleted_at IS NULL', p_table)
  USING p_id;
END;
$$ LANGUAGE plpgsql;

全文搜索

ALTER TABLE products ADD COLUMN search_vector tsvector
  GENERATED ALWAYS AS (
    to_tsvector('english', COALESCE(name, '') || ' ' || COALESCE(description, '') || ' ' || COALESCE(sku, ''))
  ) STORED;

CREATE INDEX idx_products_search ON products USING gin(search_vector);

-- 查询
SELECT * FROM products
WHERE search_vector @@ to_tsquery('english', 'laptop & gaming');

查询优化

优化前先分析

-- 总是从这里开始
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2024-01-01'
GROUP BY u.id, u.name
ORDER BY order_count DESC;

索引策略

-- 用于精确查找的单列索引
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);

-- 用于多列查询的组合索引(顺序很重要!)
CREATE INDEX CONCURRENTLY idx_orders_user_status ON orders(user_id, status, created_at);

-- 用于过滤查询的部分索引
CREATE INDEX CONCURRENTLY idx_products_low_stock
ON products(inventory_quantity)
WHERE inventory_tracking = true AND inventory_quantity <= 5;

-- 覆盖索引(包含额外列以避免表查找)
CREATE INDEX CONCURRENTLY idx_orders_covering
ON orders(user_id, status) INCLUDE (total, created_at);

-- 用于 JSONB 的 GIN 索引
CREATE INDEX CONCURRENTLY idx_products_attrs ON products USING gin(attributes);

-- 表达式索引
CREATE INDEX CONCURRENTLY idx_users_email_lower ON users(lower(email));

查找未使用的索引

SELECT
  schemaname, tablename, indexname,
  idx_scan as scans,
  pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;

查找缺失的索引(慢查询)

-- 首先启用 pg_stat_statements
SELECT query, calls, total_exec_time, mean_exec_time, rows
FROM pg_stat_statements
WHERE mean_exec_time > 100  -- 毫秒
ORDER BY total_exec_time DESC
LIMIT 20;

N+1 查询检测

-- 在 pg_stat_statements 中查找重复的相似查询
SELECT query, calls, mean_exec_time
FROM pg_stat_statements
WHERE calls > 100 AND query LIKE '%WHERE%id = $1%'
ORDER BY calls DESC;

迁移模式

安全的列添加

-- +migrate Up
-- 生产环境中索引始终使用 CONCURRENTLY
ALTER TABLE users ADD COLUMN phone VARCHAR(20);
CREATE INDEX CONCURRENTLY idx_users_phone ON users(phone) WHERE phone IS NOT NULL;

-- +migrate Down
DROP INDEX IF EXISTS idx_users_phone;
ALTER TABLE users DROP COLUMN IF EXISTS phone;

安全的列重命名(零停机时间)

-- 步骤 1: 添加新列
ALTER TABLE users ADD COLUMN display_name VARCHAR(100);
UPDATE users SET display_name = name;
ALTER TABLE users ALTER COLUMN display_name SET NOT NULL;

-- 步骤 2: 部署同时写入两个列的代码
-- 步骤 3: 部署从新列读取的代码
-- 步骤 4: 删除旧列
ALTER TABLE users DROP COLUMN name;

表分区

-- 创建分区表
CREATE TABLE orders (
  id BIGSERIAL,
  user_id BIGINT NOT NULL,
  total DECIMAL(10,2),
  created_at TIMESTAMPTZ NOT NULL,
  PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);

-- 月度分区
CREATE TABLE orders_2024_01 PARTITION OF orders
  FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02 PARTITION OF orders
  FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- 自动创建分区
CREATE OR REPLACE FUNCTION create_monthly_partition(p_table TEXT, p_date DATE)
RETURNS VOID AS $$
DECLARE
  partition_name TEXT := p_table || '_' || to_char(p_date, 'YYYY_MM');
  next_date DATE := p_date + INTERVAL '1 month';
BEGIN
  EXECUTE format(
    'CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
    partition_name, p_table, p_date, next_date
  );
END;
$$ LANGUAGE plpgsql;

EF Core 迁移 (.NET)

创建与应用

# 添加迁移
dotnet ef migrations add AddPhoneToUsers -p src/Infrastructure -s src/Api

# 应用
dotnet ef database update -p src/Infrastructure -s src/Api

# 为生产环境生成幂等 SQL 脚本
dotnet ef migrations script -p src/Infrastructure -s src/Api -o migration.sql --idempotent

# 回滚
dotnet ef database update PreviousMigrationName -p src/Infrastructure -s src/Api

EF Core 配置最佳实践

// 对于只读查询使用 AsNoTracking
var users = await _db.Users
    .AsNoTracking()
    .Where(u => u.Status == UserStatus.Active)
    .Select(u => new UserDto { Id = u.Id, Name = u.Name })
    .ToListAsync(ct);

// 使用 Include 避免 N+1 查询
var orders = await _db.Orders
    .Include(o => o.Items)
    .ThenInclude(i => i.Product)
    .Where(o => o.UserId == userId)
    .ToListAsync(ct);

// 更好的做法:使用投影
var orders = await _db.Orders
    .Where(o => o.UserId == userId)
    .Select(o => new OrderDto
    {
        Id = o.Id,
        Total = o.Total,
        Items = o.Items.Select(i => new OrderItemDto
        {
            ProductName = i.Product.Name,
            Quantity = i.Quantity,
        }).ToList(),
    })
    .ToListAsync(ct);

缓存策略

Redis 查询缓存

import Redis from 'ioredis'

const redis = new Redis(process.env.REDIS_URL)

async function cachedQuery<T>(
  key: string,
  queryFn: () => Promise<T>,
  ttlSeconds: number = 300
): Promise<T> {
  const cached = await redis.get(key)
  if (cached) return JSON.parse(cached)

  const result = await queryFn()
  await redis.setex(key, ttlSeconds, JSON.stringify(result))
  return result
}

// 用法示例
const products = await cachedQuery(
  `products:category:${categoryId}:page:${page}`,
  () => db.product.findMany({ where: { categoryId }, skip, take }),
  300 // 5 分钟
)

// 缓存失效
async function invalidateProductCache(categoryId: string) {
  const keys = await redis.keys(`products:category:${categoryId}:*`)
  if (keys.length) await redis.del(...keys)
}

物化视图

CREATE MATERIALIZED VIEW monthly_sales AS
SELECT
  DATE_TRUNC('month', created_at) as month,
  category_id,
  COUNT(*) as order_count,
  SUM(total) as revenue,
  AVG(total) as avg_order_value
FROM orders
WHERE created_at >= DATE_TRUNC('year', CURRENT_DATE)
GROUP BY 1, 2;

CREATE UNIQUE INDEX idx_monthly_sales ON monthly_sales(month, category_id);

-- 刷新物化视图(可通过 pg_cron 进行调度)
REFRESH MATERIALIZED VIEW CONCURRENTLY monthly_sales;

连接池配置

Node.js (pg)

import { Pool } from 'pg'

const pool = new Pool({
  max: 20,                      // 最大连接数
  idleTimeoutMillis: 30000,     // 空闲连接 30 秒后关闭
  connectionTimeoutMillis: 2000, // 2 秒内无法连接则快速失败
  maxUses: 7500,                // 连接使用 N 次后刷新
})

// 监控连接池健康状态
setInterval(() => {
  console.log({
    total: pool.totalCount,
    idle: pool.idleCount,
    waiting: pool.waitingCount,
  })
}, 60000)

监控查询

活动连接

SELECT count(*), state
FROM pg_stat_activity
WHERE datname = current_database()
GROUP BY state;

长时间运行的查询

SELECT pid, now() - query_start AS duration, query, state
FROM pg_stat_activity
WHERE (now() - query_start) > interval '5 minutes'
AND state = 'active';

表大小

SELECT
  relname AS table,
  pg_size_pretty(pg_total_relation_size(relid)) AS total_size,
  pg_size_pretty(pg_relation_size(relid)) AS data_size,
  pg_size_pretty(pg_total_relation_size(relid) - pg_relation_size(relid)) AS index_size
FROM pg_catalog.pg_statio_user_tables
ORDER BY pg_total_relation_size(relid) DESC
LIMIT 20;

表膨胀

SELECT
  tablename,
  pg_size_pretty(pg_total_relation_size(tablename::regclass)) as size,
  n_dead_tup,
  n_live_tup,
  CASE WHEN n_live_tup > 0
    THEN round(n_dead_tup::numeric / n_live_tup, 2)
    ELSE 0
  END as dead_ratio
FROM pg_stat_user_tables
WHERE n_dead_tup > 1000
ORDER BY dead_ratio DESC;

反模式

  1. SELECT *— 始终指定所需的列
  2. ❌ 外键上缺少索引 — 始终为外键列建立索引
  3. LIKE '%search%'— 使用全文搜索或三元组索引替代
  4. ❌ 大量IN子句 — 使用ANY(ARRAY[...])或连接一个值列表
  5. ❌ 无LIMIT的无限查询 — 始终进行分页
  6. ❌ 在生产环境中创建索引时不使用CONCURRENTLY选项
  7. ❌ 运行迁移时未测试回滚
  8. ❌ 忽略EXPLAIN ANALYZE输出 — 始终验证执行计划
  9. ❌ 将货币存储为FLOAT— 使用DECIMAL(10,2)或整数分
  10. ❌ 缺失NOT NULL约束 — 应明确声明可为空性

天猫隐藏优惠券

网淘吧

免责申明
部分文章来自各大搜索引擎,如有侵权,请与我联系删除。
打赏
文章底部电脑广告
手机广告位-内容正文底部

相关文章

您是本站第276212名访客 今日有2篇新文章/评论