Skip to content

PostgreSQL 实战案例

用户管理系统

需求分析

  • 用户注册、登录、修改密码
  • 用户角色管理
  • 用户状态管理

表设计

sql
-- 角色表
CREATE TABLE roles (
    id SERIAL PRIMARY KEY,
    name VARCHAR(50) UNIQUE NOT NULL,
    description TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    role_id INTEGER REFERENCES roles(id),
    status VARCHAR(20) DEFAULT 'active' CHECK (status IN ('active', 'inactive', 'banned')),
    last_login_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 用户会话表
CREATE TABLE user_sessions (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    token VARCHAR(255) UNIQUE NOT NULL,
    ip_address INET,
    user_agent TEXT,
    expires_at TIMESTAMPTZ NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_status ON users(status);
CREATE INDEX idx_sessions_user_id ON user_sessions(user_id);
CREATE INDEX idx_sessions_token ON user_sessions(token);
CREATE INDEX idx_sessions_expires ON user_sessions(expires_at);

基础功能实现

sql
-- 创建更新触发器
CREATE TRIGGER update_users_timestamp
    BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_timestamp();

-- 用户注册
CREATE OR REPLACE FUNCTION register_user(
    p_username VARCHAR,
    p_email VARCHAR,
    p_password VARCHAR,
    p_role_id INTEGER DEFAULT NULL
) RETURNS INTEGER AS $$
DECLARE
    v_user_id INTEGER;
BEGIN
    -- 检查用户名和邮箱是否已存在
    IF EXISTS (SELECT 1 FROM users WHERE username = p_username) THEN
        RAISE EXCEPTION '用户名已存在';
    END IF;
    
    IF EXISTS (SELECT 1 FROM users WHERE email = p_email) THEN
        RAISE EXCEPTION '邮箱已被注册';
    END IF;
    
    -- 插入用户(密码应该先哈希)
    INSERT INTO users (username, email, password_hash, role_id)
    VALUES (p_username, p_email, p_password, p_role_id)
    RETURNING id INTO v_user_id;
    
    RETURN v_user_id;
END;
$$ LANGUAGE plpgsql;

-- 用户登录
CREATE OR REPLACE FUNCTION login_user(
    p_username VARCHAR,
    p_password VARCHAR,
    p_ip INET DEFAULT NULL,
    p_user_agent TEXT DEFAULT NULL
) RETURNS TABLE(token UUID, expires_at TIMESTAMPTZ) AS $$
DECLARE
    v_user_id INTEGER;
    v_password_hash VARCHAR;
    v_session_token UUID;
    v_expires_at TIMESTAMPTZ;
BEGIN
    -- 查找用户
    SELECT id, password_hash INTO v_user_id, v_password_hash
    FROM users 
    WHERE username = p_username AND status = 'active';
    
    IF NOT FOUND THEN
        RAISE EXCEPTION '用户名或密码错误';
    END IF;
    
    -- 验证密码(实际应该用 bcrypt 等库验证)
    IF v_password_hash != p_password THEN
        RAISE EXCEPTION '用户名或密码错误';
    END IF;
    
    -- 更新最后登录时间
    UPDATE users SET last_login_at = NOW() WHERE id = v_user_id;
    
    -- 创建会话
    v_session_token := uuid_generate_v4();
    v_expires_at := NOW() + INTERVAL '7 days';
    
    INSERT INTO user_sessions (user_id, token, ip_address, user_agent, expires_at)
    VALUES (v_user_id, v_session_token, p_ip, p_user_agent, v_expires_at);
    
    RETURN QUERY SELECT v_session_token, v_expires_at;
END;
$$ LANGUAGE plpgsql;

-- 修改密码
CREATE OR REPLACE FUNCTION change_password(
    p_user_id INTEGER,
    p_old_password VARCHAR,
    p_new_password VARCHAR
) RETURNS BOOLEAN AS $$
DECLARE
    v_old_hash VARCHAR;
BEGIN
    SELECT password_hash INTO v_old_hash FROM users WHERE id = p_user_id;
    
    IF v_old_hash != p_old_password THEN
        RAISE EXCEPTION '原密码错误';
    END IF;
    
    UPDATE users SET password_hash = p_new_password WHERE id = p_user_id;
    
    -- 使其他会话失效(可选)
    DELETE FROM user_sessions WHERE user_id = p_user_id;
    
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

-- 获取用户列表(分页)
CREATE OR REPLACE FUNCTION get_users_list(
    p_page INTEGER DEFAULT 1,
    p_page_size INTEGER DEFAULT 20,
    p_status VARCHAR DEFAULT NULL
) RETURNS TABLE(
    total BIGINT,
    page INTEGER,
    page_size INTEGER,
    data JSON
) AS $$
DECLARE
    v_offset INTEGER;
    v_total BIGINT;
BEGIN
    v_offset := (p_page - 1) * p_page_size;
    
    -- 获取总数
    SELECT COUNT(*) INTO v_total FROM users;
    IF p_status IS NOT NULL THEN
        SELECT COUNT(*) INTO v_total FROM users WHERE status = p_status;
    END IF;
    
    -- 返回分页数据
    RETURN QUERY 
    SELECT 
        v_total,
        p_page,
        p_page_size,
        json_agg(row_to_json(t))::JSON
    FROM (
        SELECT u.id, u.username, u.email, r.name AS role_name, u.status, u.created_at
        FROM users u
        LEFT JOIN roles r ON u.role_id = r.id
        WHERE (p_status IS NULL OR u.status = p_status)
        ORDER BY u.created_at DESC
        LIMIT p_page_size OFFSET v_offset
    ) t;
END;
$$ LANGUAGE plpgsql;

订单系统

表设计

sql
-- 商品表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    description TEXT,
    price DECIMAL(10,2) NOT NULL CHECK (price >= 0),
    stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
    category_id INTEGER,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 订单表
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id),
    order_no VARCHAR(50) UNIQUE NOT NULL,
    status VARCHAR(20) DEFAULT 'pending' CHECK (status IN (
        'pending', 'paid', 'shipped', 'delivered', 'cancelled', 'refunded'
    )),
    total_amount DECIMAL(10,2) NOT NULL CHECK (total_amount >= 0),
    paid_at TIMESTAMPTZ,
    shipped_at TIMESTAMPTZ,
    delivered_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 订单明细表
CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id INTEGER NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
    product_id INTEGER NOT NULL REFERENCES products(id),
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    unit_price DECIMAL(10,2) NOT NULL CHECK (unit_price >= 0),
    subtotal DECIMAL(10,2) NOT NULL CHECK (subtotal >= 0),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 库存变动记录表
CREATE TABLE stock_logs (
    id SERIAL PRIMARY KEY,
    product_id INTEGER NOT NULL REFERENCES products(id),
    order_id INTEGER REFERENCES orders(id),
    change_type VARCHAR(20) NOT NULL CHECK (change_type IN ('increase', 'decrease', 'lock')),
    quantity INTEGER NOT NULL,
    reason VARCHAR(200),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- 索引
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at DESC);
CREATE INDEX idx_order_items_order_id ON order_items(order_id);
CREATE INDEX idx_order_items_product_id ON order_items(product_id);
CREATE INDEX idx_stock_logs_product_id ON stock_logs(product_id);

-- 触发器
CREATE TRIGGER update_products_timestamp
    BEFORE UPDATE ON products
    FOR EACH ROW EXECUTE FUNCTION update_timestamp();

CREATE TRIGGER update_orders_timestamp
    BEFORE UPDATE ON orders
    FOR EACH ROW EXECUTE FUNCTION update_timestamp();

核心功能

sql
-- 创建订单(事务处理)
CREATE OR REPLACE FUNCTION create_order(
    p_user_id INTEGER,
    p_items JSONB  -- [{product_id: 1, quantity: 2}, ...]
) RETURNS TABLE(order_id INTEGER, order_no VARCHAR) AS $$
DECLARE
    v_order_id INTEGER;
    v_order_no VARCHAR(50);
    v_item JSONB;
    v_product_id INTEGER;
    v_quantity INTEGER;
    v_unit_price DECIMAL(10,2);
    v_subtotal DECIMAL(10,2);
    v_total DECIMAL(10,2) := 0;
BEGIN
    -- 生成订单号
    v_order_no := 'ORD' || TO_CHAR(NOW(), 'YYYYMMDDHH24MISS') || 
                  LPAD(NEXTVAL('order_seq')::TEXT, 6, '0');
    
    -- 验证库存并计算总价
    FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
    LOOP
        v_product_id := (v_item->>'product_id')::INTEGER;
        v_quantity := (v_item->>'quantity')::INTEGER;
        
        -- 锁定库存检查
        SELECT price, stock INTO v_unit_price, v_stock
        FROM products 
        WHERE id = v_product_id AND status = 'active'
        FOR UPDATE;
        
        IF NOT FOUND THEN
            RAISE EXCEPTION '商品不存在或已下架: %', v_product_id;
        END IF;
        
        IF v_stock < v_quantity THEN
            RAISE EXCEPTION '库存不足: 商品ID %', v_product_id;
        END IF;
        
        -- 扣减库存
        UPDATE products SET stock = stock - v_quantity WHERE id = v_product_id;
        
        -- 记录库存变动
        INSERT INTO stock_logs (product_id, order_id, change_type, quantity, reason)
        VALUES (v_product_id, NULL, 'lock', -v_quantity, '创建订单 ' || v_order_no);
        
        v_subtotal := v_unit_price * v_quantity;
        v_total := v_total + v_subtotal;
    END LOOP;
    
    -- 创建订单
    INSERT INTO orders (user_id, order_no, total_amount, status)
    VALUES (p_user_id, v_order_no, v_total, 'pending')
    RETURNING id INTO v_order_id;
    
    -- 创建订单明细
    FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
    LOOP
        v_product_id := (v_item->>'product_id')::INTEGER;
        v_quantity := (v_item->>'quantity')::INTEGER;
        
        SELECT price INTO v_unit_price FROM products WHERE id = v_product_id;
        v_subtotal := v_unit_price * v_quantity;
        
        INSERT INTO order_items (order_id, product_id, quantity, unit_price, subtotal)
        VALUES (v_order_id, v_product_id, v_quantity, v_unit_price, v_subtotal);
        
        -- 更新库存变动记录关联
        UPDATE stock_logs 
        SET order_id = v_order_id 
        WHERE product_id = v_product_id AND order_id IS NULL;
    END LOOP;
    
    RETURN QUERY SELECT v_order_id, v_order_no;
END;
$$ LANGUAGE plpgsql;

-- 支付订单
CREATE OR REPLACE FUNCTION pay_order(p_order_id INTEGER) RETURNS BOOLEAN AS $$
DECLARE
    v_order orders%ROWTYPE;
BEGIN
    SELECT * INTO v_order FROM orders WHERE id = p_order_id FOR UPDATE;
    
    IF NOT FOUND THEN
        RAISE EXCEPTION '订单不存在';
    END IF;
    
    IF v_order.status != 'pending' THEN
        RAISE EXCEPTION '订单状态不允许支付: %', v_order.status;
    END IF;
    
    UPDATE orders SET status = 'paid', paid_at = NOW() WHERE id = p_order_id;
    
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

-- 取消订单
CREATE OR REPLACE FUNCTION cancel_order(p_order_id INTEGER) RETURNS BOOLEAN AS $$
DECLARE
    v_order orders%ROWTYPE;
BEGIN
    SELECT * INTO v_order FROM orders WHERE id = p_order_id FOR UPDATE;
    
    IF NOT FOUND THEN
        RAISE EXCEPTION '订单不存在';
    END IF;
    
    IF v_order.status NOT IN ('pending', 'paid') THEN
        RAISE EXCEPTION '订单状态不允许取消: %', v_order.status;
    END IF;
    
    -- 恢复库存
    UPDATE products p
    SET stock = stock + oi.quantity
    FROM order_items oi
    WHERE oi.order_id = p_order_id AND p.id = oi.product_id;
    
    -- 记录库存恢复
    INSERT INTO stock_logs (product_id, order_id, change_type, quantity, reason)
    SELECT product_id, p_order_id, 'increase', quantity, '取消订单'
    FROM order_items WHERE order_id = p_order_id;
    
    UPDATE orders SET status = 'cancelled' WHERE id = p_order_id;
    
    RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

-- 订单统计
CREATE OR REPLACE FUNCTION get_order_stats(
    p_start_date DATE,
    p_end_date DATE
) RETURNS TABLE(
    total_orders BIGINT,
    total_amount DECIMAL,
    avg_amount DECIMAL,
    by_status JSON
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        COUNT(*) AS total_orders,
        COALESCE(SUM(total_amount), 0) AS total_amount,
        COALESCE(AVG(total_amount), 0) AS avg_amount,
        json_object_agg(status, count_per_status) AS by_status
    FROM orders
    WHERE created_at >= p_start_date AND created_at < p_end_date + INTERVAL '1 day'
    GROUP BY status;
END;
$$ LANGUAGE plpgsql;

日志分析系统

表设计

sql
-- 访问日志表(分区表)
CREATE TABLE access_logs (
    id BIGSERIAL,
    ip_address INET,
    user_id INTEGER,
    method VARCHAR(10),
    path TEXT,
    status_code INTEGER,
    response_time INTEGER,  -- 毫秒
    user_agent TEXT,
    referer TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- 创建月度分区
CREATE TABLE access_logs_2024_01 PARTITION OF access_logs
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

CREATE TABLE access_logs_2024_02 PARTITION OF access_logs
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- 错误日志表
CREATE TABLE error_logs (
    id BIGSERIAL PRIMARY KEY,
    level VARCHAR(20),
    message TEXT,
    stack_trace TEXT,
    user_id INTEGER,
    request_id UUID,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_error_logs_level ON error_logs(level);
CREATE INDEX idx_error_logs_created ON error_logs(created_at DESC);

分析功能

sql
-- PV/UV 统计
CREATE OR REPLACE FUNCTION get_access_stats(
    p_date DATE
) RETURNS TABLE(
    pv BIGINT,
    uv BIGINT,
    avg_response_time DECIMAL
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        COUNT(*) AS pv,
        COUNT(DISTINCT ip_address) AS uv,
        AVG(response_time)::DECIMAL AS avg_response_time
    FROM access_logs
    WHERE created_at >= p_date AND created_at < p_date + INTERVAL '1 day';
END;
$$ LANGUAGE plpgsql;

-- 热门页面
CREATE OR REPLACE FUNCTION get_popular_pages(
    p_start_date DATE,
    p_end_date DATE,
    p_limit INTEGER DEFAULT 10
) RETURNS TABLE(
    path TEXT,
    pv BIGINT,
    uv BIGINT,
    avg_response_time DECIMAL
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        path,
        COUNT(*) AS pv,
        COUNT(DISTINCT ip_address) AS uv,
        AVG(response_time)::DECIMAL AS avg_response_time
    FROM access_logs
    WHERE created_at >= p_start_date 
      AND created_at < p_end_date + INTERVAL '1 day'
    GROUP BY path
    ORDER BY pv DESC
    LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;

-- IP 访问排名(用于防爬虫)
CREATE OR REPLACE FUNCTION get_ip_ranking(
    p_date DATE,
    p_threshold INTEGER DEFAULT 1000
) RETURNS TABLE(
    ip_address INET,
    access_count BIGINT,
    is_suspicious BOOLEAN
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        ip_address,
        COUNT(*) AS access_count,
        COUNT(*) > p_threshold AS is_suspicious
    FROM access_logs
    WHERE created_at >= p_date AND created_at < p_date + INTERVAL '1 day'
    GROUP BY ip_address
    HAVING COUNT(*) > p_threshold / 10  -- 访问量超过阈值10%标记为可疑
    ORDER BY access_count DESC;
END;
$$ LANGUAGE plpgsql;

-- 错误趋势
CREATE OR REPLACE FUNCTION get_error_trend(
    p_start_date DATE,
    p_end_date DATE
) RETURNS TABLE(
    date DATE,
    error_count BIGINT,
    warning_count BIGINT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        created_at::DATE AS date,
        COUNT(*) FILTER (WHERE level = 'ERROR') AS error_count,
        COUNT(*) FILTER (WHERE level = 'WARNING') AS warning_count
    FROM error_logs
    WHERE created_at >= p_start_date AND created_at < p_end_date + INTERVAL '1 day'
    GROUP BY created_at::DATE
    ORDER BY date;
END;
$$ LANGUAGE plpgsql;

数据库维护脚本

自动分区创建

sql
-- 创建月度分区的存储过程
CREATE OR REPLACE FUNCTION create_monthly_partition(
    p_table_name VARCHAR,
    p_year INTEGER,
    p_month INTEGER
) RETURNS VOID AS $$
DECLARE
    v_partition_name VARCHAR;
    v_start_date DATE;
    v_end_date DATE;
BEGIN
    v_partition_name := p_table_name || '_' || p_year || '_' || 
                       LPAD(p_month::TEXT, 2, '0');
    v_start_date := make_date(p_year, p_month, 1);
    v_end_date := v_start_date + INTERVAL '1 month';
    
    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS %I PARTITION OF %I 
         FOR VALUES FROM (%L) TO (%L)',
        v_partition_name, p_table_name, v_start_date, v_end_date
    );
END;
$$ LANGUAGE plpgsql;

-- 批量创建未来N个月的分区
CREATE OR REPLACE FUNCTION create_future_partitions(
    p_table_name VARCHAR,
    p_months INTEGER DEFAULT 3
) RETURNS VOID AS $$
DECLARE
    v_current_date DATE := CURRENT_DATE;
    v_future_date DATE;
    v_year INTEGER;
    v_month INTEGER;
BEGIN
    FOR i IN 0..p_months LOOP
        v_future_date := v_current_date + (i || ' months')::INTERVAL;
        v_year := EXTRACT(YEAR FROM v_future_date)::INTEGER;
        v_month := EXTRACT(MONTH FROM v_future_date)::INTEGER;
        PERFORM create_monthly_partition(p_table_name, v_year, v_month);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

数据库健康检查

sql
-- 数据库健康检查函数
CREATE OR REPLACE FUNCTION health_check() 
RETURNS TABLE(
    check_name VARCHAR,
    check_value TEXT,
    status VARCHAR
) AS $$
BEGIN
    -- 检查连接数
    RETURN QUERY
    SELECT 
        'connections'::VARCHAR,
        current_setting('max_connections') || ' / ' || 
        (SELECT COUNT(*) FROM pg_stat_activity)::TEXT,
        CASE WHEN (SELECT COUNT(*) FROM pg_stat_activity) < 
             current_setting('max_connections')::INT * 0.8 
             THEN 'OK' ELSE 'WARNING' END;

    -- 检查数据库大小
    RETURN QUERY
    SELECT 
        'db_size'::VARCHAR,
        pg_size_pretty(pg_database_size(current_database())),
        'OK'::VARCHAR;

    -- 检查死亡元组
    RETURN QUERY
    SELECT 
        'dead_tuples'::VARCHAR,
        SUM(n_dead_tup)::TEXT,
        CASE WHEN SUM(n_dead_tup) > 10000 THEN 'WARNING' ELSE 'OK' END
    FROM pg_stat_user_tables;

    -- 检查未使用索引
    RETURN QUERY
    SELECT 
        'unused_indexes'::VARCHAR,
        COUNT(*)::TEXT,
        CASE WHEN COUNT(*) > 10 THEN 'WARNING' ELSE 'OK' END
    FROM pg_stat_user_indexes
    WHERE idx_scan = 0 AND indexrelid NOT IN (
        SELECT conindid FROM pg_constraint WHERE contype IN ('p', 'u')
    );

    -- 检查长事务
    RETURN QUERY
    SELECT 
        'long_transactions'::VARCHAR,
        COUNT(*)::TEXT,
        CASE WHEN COUNT(*) > 5 THEN 'WARNING' ELSE 'OK' END
    FROM pg_stat_activity
    WHERE state != 'idle' AND state != 'idle in transaction'
      AND query_start < NOW() - INTERVAL '10 minutes';

    -- 检查复制延迟
    RETURN QUERY
    SELECT 
        'replication_lag'::VARCHAR,
        COALESCE(pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn))::TEXT || ' bytes',
        CASE WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) > 10000000 
             THEN 'WARNING' ELSE 'OK' END
    FROM pg_stat_replication
    LIMIT 1;
END;
$$ LANGUAGE plpgsql;

基于 MIT 许可发布