PostgreSQL 实战案例
用户管理系统
需求分析
- 用户注册、登录、修改密码
- 用户角色管理
- 用户状态管理
表设计
sql
-- 角色表
CREATE TABLE roles (
id SERIAL PRIMARY KEY,
name VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 用户表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
role_id INTEGER REFERENCES roles(id),
status VARCHAR(20) DEFAULT 'active' CHECK (status IN ('active', 'inactive', 'banned')),
last_login_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 用户会话表
CREATE TABLE user_sessions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
token VARCHAR(255) UNIQUE NOT NULL,
ip_address INET,
user_agent TEXT,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_status ON users(status);
CREATE INDEX idx_sessions_user_id ON user_sessions(user_id);
CREATE INDEX idx_sessions_token ON user_sessions(token);
CREATE INDEX idx_sessions_expires ON user_sessions(expires_at);基础功能实现
sql
-- 创建更新触发器
CREATE TRIGGER update_users_timestamp
BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_timestamp();
-- 用户注册
CREATE OR REPLACE FUNCTION register_user(
p_username VARCHAR,
p_email VARCHAR,
p_password VARCHAR,
p_role_id INTEGER DEFAULT NULL
) RETURNS INTEGER AS $$
DECLARE
v_user_id INTEGER;
BEGIN
-- 检查用户名和邮箱是否已存在
IF EXISTS (SELECT 1 FROM users WHERE username = p_username) THEN
RAISE EXCEPTION '用户名已存在';
END IF;
IF EXISTS (SELECT 1 FROM users WHERE email = p_email) THEN
RAISE EXCEPTION '邮箱已被注册';
END IF;
-- 插入用户(密码应该先哈希)
INSERT INTO users (username, email, password_hash, role_id)
VALUES (p_username, p_email, p_password, p_role_id)
RETURNING id INTO v_user_id;
RETURN v_user_id;
END;
$$ LANGUAGE plpgsql;
-- 用户登录
CREATE OR REPLACE FUNCTION login_user(
p_username VARCHAR,
p_password VARCHAR,
p_ip INET DEFAULT NULL,
p_user_agent TEXT DEFAULT NULL
) RETURNS TABLE(token UUID, expires_at TIMESTAMPTZ) AS $$
DECLARE
v_user_id INTEGER;
v_password_hash VARCHAR;
v_session_token UUID;
v_expires_at TIMESTAMPTZ;
BEGIN
-- 查找用户
SELECT id, password_hash INTO v_user_id, v_password_hash
FROM users
WHERE username = p_username AND status = 'active';
IF NOT FOUND THEN
RAISE EXCEPTION '用户名或密码错误';
END IF;
-- 验证密码(实际应该用 bcrypt 等库验证)
IF v_password_hash != p_password THEN
RAISE EXCEPTION '用户名或密码错误';
END IF;
-- 更新最后登录时间
UPDATE users SET last_login_at = NOW() WHERE id = v_user_id;
-- 创建会话
v_session_token := uuid_generate_v4();
v_expires_at := NOW() + INTERVAL '7 days';
INSERT INTO user_sessions (user_id, token, ip_address, user_agent, expires_at)
VALUES (v_user_id, v_session_token, p_ip, p_user_agent, v_expires_at);
RETURN QUERY SELECT v_session_token, v_expires_at;
END;
$$ LANGUAGE plpgsql;
-- 修改密码
CREATE OR REPLACE FUNCTION change_password(
p_user_id INTEGER,
p_old_password VARCHAR,
p_new_password VARCHAR
) RETURNS BOOLEAN AS $$
DECLARE
v_old_hash VARCHAR;
BEGIN
SELECT password_hash INTO v_old_hash FROM users WHERE id = p_user_id;
IF v_old_hash != p_old_password THEN
RAISE EXCEPTION '原密码错误';
END IF;
UPDATE users SET password_hash = p_new_password WHERE id = p_user_id;
-- 使其他会话失效(可选)
DELETE FROM user_sessions WHERE user_id = p_user_id;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
-- 获取用户列表(分页)
CREATE OR REPLACE FUNCTION get_users_list(
p_page INTEGER DEFAULT 1,
p_page_size INTEGER DEFAULT 20,
p_status VARCHAR DEFAULT NULL
) RETURNS TABLE(
total BIGINT,
page INTEGER,
page_size INTEGER,
data JSON
) AS $$
DECLARE
v_offset INTEGER;
v_total BIGINT;
BEGIN
v_offset := (p_page - 1) * p_page_size;
-- 获取总数
SELECT COUNT(*) INTO v_total FROM users;
IF p_status IS NOT NULL THEN
SELECT COUNT(*) INTO v_total FROM users WHERE status = p_status;
END IF;
-- 返回分页数据
RETURN QUERY
SELECT
v_total,
p_page,
p_page_size,
json_agg(row_to_json(t))::JSON
FROM (
SELECT u.id, u.username, u.email, r.name AS role_name, u.status, u.created_at
FROM users u
LEFT JOIN roles r ON u.role_id = r.id
WHERE (p_status IS NULL OR u.status = p_status)
ORDER BY u.created_at DESC
LIMIT p_page_size OFFSET v_offset
) t;
END;
$$ LANGUAGE plpgsql;订单系统
表设计
sql
-- 商品表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL CHECK (price >= 0),
stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
category_id INTEGER,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 订单表
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id),
order_no VARCHAR(50) UNIQUE NOT NULL,
status VARCHAR(20) DEFAULT 'pending' CHECK (status IN (
'pending', 'paid', 'shipped', 'delivered', 'cancelled', 'refunded'
)),
total_amount DECIMAL(10,2) NOT NULL CHECK (total_amount >= 0),
paid_at TIMESTAMPTZ,
shipped_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 订单明细表
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
product_id INTEGER NOT NULL REFERENCES products(id),
quantity INTEGER NOT NULL CHECK (quantity > 0),
unit_price DECIMAL(10,2) NOT NULL CHECK (unit_price >= 0),
subtotal DECIMAL(10,2) NOT NULL CHECK (subtotal >= 0),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 库存变动记录表
CREATE TABLE stock_logs (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id),
order_id INTEGER REFERENCES orders(id),
change_type VARCHAR(20) NOT NULL CHECK (change_type IN ('increase', 'decrease', 'lock')),
quantity INTEGER NOT NULL,
reason VARCHAR(200),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 索引
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at DESC);
CREATE INDEX idx_order_items_order_id ON order_items(order_id);
CREATE INDEX idx_order_items_product_id ON order_items(product_id);
CREATE INDEX idx_stock_logs_product_id ON stock_logs(product_id);
-- 触发器
CREATE TRIGGER update_products_timestamp
BEFORE UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION update_timestamp();
CREATE TRIGGER update_orders_timestamp
BEFORE UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION update_timestamp();核心功能
sql
-- 创建订单(事务处理)
CREATE OR REPLACE FUNCTION create_order(
p_user_id INTEGER,
p_items JSONB -- [{product_id: 1, quantity: 2}, ...]
) RETURNS TABLE(order_id INTEGER, order_no VARCHAR) AS $$
DECLARE
v_order_id INTEGER;
v_order_no VARCHAR(50);
v_item JSONB;
v_product_id INTEGER;
v_quantity INTEGER;
v_unit_price DECIMAL(10,2);
v_subtotal DECIMAL(10,2);
v_total DECIMAL(10,2) := 0;
BEGIN
-- 生成订单号
v_order_no := 'ORD' || TO_CHAR(NOW(), 'YYYYMMDDHH24MISS') ||
LPAD(NEXTVAL('order_seq')::TEXT, 6, '0');
-- 验证库存并计算总价
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_product_id := (v_item->>'product_id')::INTEGER;
v_quantity := (v_item->>'quantity')::INTEGER;
-- 锁定库存检查
SELECT price, stock INTO v_unit_price, v_stock
FROM products
WHERE id = v_product_id AND status = 'active'
FOR UPDATE;
IF NOT FOUND THEN
RAISE EXCEPTION '商品不存在或已下架: %', v_product_id;
END IF;
IF v_stock < v_quantity THEN
RAISE EXCEPTION '库存不足: 商品ID %', v_product_id;
END IF;
-- 扣减库存
UPDATE products SET stock = stock - v_quantity WHERE id = v_product_id;
-- 记录库存变动
INSERT INTO stock_logs (product_id, order_id, change_type, quantity, reason)
VALUES (v_product_id, NULL, 'lock', -v_quantity, '创建订单 ' || v_order_no);
v_subtotal := v_unit_price * v_quantity;
v_total := v_total + v_subtotal;
END LOOP;
-- 创建订单
INSERT INTO orders (user_id, order_no, total_amount, status)
VALUES (p_user_id, v_order_no, v_total, 'pending')
RETURNING id INTO v_order_id;
-- 创建订单明细
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_product_id := (v_item->>'product_id')::INTEGER;
v_quantity := (v_item->>'quantity')::INTEGER;
SELECT price INTO v_unit_price FROM products WHERE id = v_product_id;
v_subtotal := v_unit_price * v_quantity;
INSERT INTO order_items (order_id, product_id, quantity, unit_price, subtotal)
VALUES (v_order_id, v_product_id, v_quantity, v_unit_price, v_subtotal);
-- 更新库存变动记录关联
UPDATE stock_logs
SET order_id = v_order_id
WHERE product_id = v_product_id AND order_id IS NULL;
END LOOP;
RETURN QUERY SELECT v_order_id, v_order_no;
END;
$$ LANGUAGE plpgsql;
-- 支付订单
CREATE OR REPLACE FUNCTION pay_order(p_order_id INTEGER) RETURNS BOOLEAN AS $$
DECLARE
v_order orders%ROWTYPE;
BEGIN
SELECT * INTO v_order FROM orders WHERE id = p_order_id FOR UPDATE;
IF NOT FOUND THEN
RAISE EXCEPTION '订单不存在';
END IF;
IF v_order.status != 'pending' THEN
RAISE EXCEPTION '订单状态不允许支付: %', v_order.status;
END IF;
UPDATE orders SET status = 'paid', paid_at = NOW() WHERE id = p_order_id;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
-- 取消订单
CREATE OR REPLACE FUNCTION cancel_order(p_order_id INTEGER) RETURNS BOOLEAN AS $$
DECLARE
v_order orders%ROWTYPE;
BEGIN
SELECT * INTO v_order FROM orders WHERE id = p_order_id FOR UPDATE;
IF NOT FOUND THEN
RAISE EXCEPTION '订单不存在';
END IF;
IF v_order.status NOT IN ('pending', 'paid') THEN
RAISE EXCEPTION '订单状态不允许取消: %', v_order.status;
END IF;
-- 恢复库存
UPDATE products p
SET stock = stock + oi.quantity
FROM order_items oi
WHERE oi.order_id = p_order_id AND p.id = oi.product_id;
-- 记录库存恢复
INSERT INTO stock_logs (product_id, order_id, change_type, quantity, reason)
SELECT product_id, p_order_id, 'increase', quantity, '取消订单'
FROM order_items WHERE order_id = p_order_id;
UPDATE orders SET status = 'cancelled' WHERE id = p_order_id;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
-- 订单统计
CREATE OR REPLACE FUNCTION get_order_stats(
p_start_date DATE,
p_end_date DATE
) RETURNS TABLE(
total_orders BIGINT,
total_amount DECIMAL,
avg_amount DECIMAL,
by_status JSON
) AS $$
BEGIN
RETURN QUERY
SELECT
COUNT(*) AS total_orders,
COALESCE(SUM(total_amount), 0) AS total_amount,
COALESCE(AVG(total_amount), 0) AS avg_amount,
json_object_agg(status, count_per_status) AS by_status
FROM orders
WHERE created_at >= p_start_date AND created_at < p_end_date + INTERVAL '1 day'
GROUP BY status;
END;
$$ LANGUAGE plpgsql;日志分析系统
表设计
sql
-- 访问日志表(分区表)
CREATE TABLE access_logs (
id BIGSERIAL,
ip_address INET,
user_id INTEGER,
method VARCHAR(10),
path TEXT,
status_code INTEGER,
response_time INTEGER, -- 毫秒
user_agent TEXT,
referer TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- 创建月度分区
CREATE TABLE access_logs_2024_01 PARTITION OF access_logs
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE access_logs_2024_02 PARTITION OF access_logs
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- 错误日志表
CREATE TABLE error_logs (
id BIGSERIAL PRIMARY KEY,
level VARCHAR(20),
message TEXT,
stack_trace TEXT,
user_id INTEGER,
request_id UUID,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_error_logs_level ON error_logs(level);
CREATE INDEX idx_error_logs_created ON error_logs(created_at DESC);分析功能
sql
-- PV/UV 统计
CREATE OR REPLACE FUNCTION get_access_stats(
p_date DATE
) RETURNS TABLE(
pv BIGINT,
uv BIGINT,
avg_response_time DECIMAL
) AS $$
BEGIN
RETURN QUERY
SELECT
COUNT(*) AS pv,
COUNT(DISTINCT ip_address) AS uv,
AVG(response_time)::DECIMAL AS avg_response_time
FROM access_logs
WHERE created_at >= p_date AND created_at < p_date + INTERVAL '1 day';
END;
$$ LANGUAGE plpgsql;
-- 热门页面
CREATE OR REPLACE FUNCTION get_popular_pages(
p_start_date DATE,
p_end_date DATE,
p_limit INTEGER DEFAULT 10
) RETURNS TABLE(
path TEXT,
pv BIGINT,
uv BIGINT,
avg_response_time DECIMAL
) AS $$
BEGIN
RETURN QUERY
SELECT
path,
COUNT(*) AS pv,
COUNT(DISTINCT ip_address) AS uv,
AVG(response_time)::DECIMAL AS avg_response_time
FROM access_logs
WHERE created_at >= p_start_date
AND created_at < p_end_date + INTERVAL '1 day'
GROUP BY path
ORDER BY pv DESC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
-- IP 访问排名(用于防爬虫)
CREATE OR REPLACE FUNCTION get_ip_ranking(
p_date DATE,
p_threshold INTEGER DEFAULT 1000
) RETURNS TABLE(
ip_address INET,
access_count BIGINT,
is_suspicious BOOLEAN
) AS $$
BEGIN
RETURN QUERY
SELECT
ip_address,
COUNT(*) AS access_count,
COUNT(*) > p_threshold AS is_suspicious
FROM access_logs
WHERE created_at >= p_date AND created_at < p_date + INTERVAL '1 day'
GROUP BY ip_address
HAVING COUNT(*) > p_threshold / 10 -- 访问量超过阈值10%标记为可疑
ORDER BY access_count DESC;
END;
$$ LANGUAGE plpgsql;
-- 错误趋势
CREATE OR REPLACE FUNCTION get_error_trend(
p_start_date DATE,
p_end_date DATE
) RETURNS TABLE(
date DATE,
error_count BIGINT,
warning_count BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
created_at::DATE AS date,
COUNT(*) FILTER (WHERE level = 'ERROR') AS error_count,
COUNT(*) FILTER (WHERE level = 'WARNING') AS warning_count
FROM error_logs
WHERE created_at >= p_start_date AND created_at < p_end_date + INTERVAL '1 day'
GROUP BY created_at::DATE
ORDER BY date;
END;
$$ LANGUAGE plpgsql;数据库维护脚本
自动分区创建
sql
-- 创建月度分区的存储过程
CREATE OR REPLACE FUNCTION create_monthly_partition(
p_table_name VARCHAR,
p_year INTEGER,
p_month INTEGER
) RETURNS VOID AS $$
DECLARE
v_partition_name VARCHAR;
v_start_date DATE;
v_end_date DATE;
BEGIN
v_partition_name := p_table_name || '_' || p_year || '_' ||
LPAD(p_month::TEXT, 2, '0');
v_start_date := make_date(p_year, p_month, 1);
v_end_date := v_start_date + INTERVAL '1 month';
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF %I
FOR VALUES FROM (%L) TO (%L)',
v_partition_name, p_table_name, v_start_date, v_end_date
);
END;
$$ LANGUAGE plpgsql;
-- 批量创建未来N个月的分区
CREATE OR REPLACE FUNCTION create_future_partitions(
p_table_name VARCHAR,
p_months INTEGER DEFAULT 3
) RETURNS VOID AS $$
DECLARE
v_current_date DATE := CURRENT_DATE;
v_future_date DATE;
v_year INTEGER;
v_month INTEGER;
BEGIN
FOR i IN 0..p_months LOOP
v_future_date := v_current_date + (i || ' months')::INTERVAL;
v_year := EXTRACT(YEAR FROM v_future_date)::INTEGER;
v_month := EXTRACT(MONTH FROM v_future_date)::INTEGER;
PERFORM create_monthly_partition(p_table_name, v_year, v_month);
END LOOP;
END;
$$ LANGUAGE plpgsql;数据库健康检查
sql
-- 数据库健康检查函数
CREATE OR REPLACE FUNCTION health_check()
RETURNS TABLE(
check_name VARCHAR,
check_value TEXT,
status VARCHAR
) AS $$
BEGIN
-- 检查连接数
RETURN QUERY
SELECT
'connections'::VARCHAR,
current_setting('max_connections') || ' / ' ||
(SELECT COUNT(*) FROM pg_stat_activity)::TEXT,
CASE WHEN (SELECT COUNT(*) FROM pg_stat_activity) <
current_setting('max_connections')::INT * 0.8
THEN 'OK' ELSE 'WARNING' END;
-- 检查数据库大小
RETURN QUERY
SELECT
'db_size'::VARCHAR,
pg_size_pretty(pg_database_size(current_database())),
'OK'::VARCHAR;
-- 检查死亡元组
RETURN QUERY
SELECT
'dead_tuples'::VARCHAR,
SUM(n_dead_tup)::TEXT,
CASE WHEN SUM(n_dead_tup) > 10000 THEN 'WARNING' ELSE 'OK' END
FROM pg_stat_user_tables;
-- 检查未使用索引
RETURN QUERY
SELECT
'unused_indexes'::VARCHAR,
COUNT(*)::TEXT,
CASE WHEN COUNT(*) > 10 THEN 'WARNING' ELSE 'OK' END
FROM pg_stat_user_indexes
WHERE idx_scan = 0 AND indexrelid NOT IN (
SELECT conindid FROM pg_constraint WHERE contype IN ('p', 'u')
);
-- 检查长事务
RETURN QUERY
SELECT
'long_transactions'::VARCHAR,
COUNT(*)::TEXT,
CASE WHEN COUNT(*) > 5 THEN 'WARNING' ELSE 'OK' END
FROM pg_stat_activity
WHERE state != 'idle' AND state != 'idle in transaction'
AND query_start < NOW() - INTERVAL '10 minutes';
-- 检查复制延迟
RETURN QUERY
SELECT
'replication_lag'::VARCHAR,
COALESCE(pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn))::TEXT || ' bytes',
CASE WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) > 10000000
THEN 'WARNING' ELSE 'OK' END
FROM pg_stat_replication
LIMIT 1;
END;
$$ LANGUAGE plpgsql;