Memcached教程FG007-Memcached Java客户端集成实战
本文档风哥主要介绍Memcached数据库Java客户端集成相关知识,包括Memcached数据库Java客户端概述、Xmemcached客户端介绍、SpyMemcached客户端介绍、依赖规划、连接池规划、Xmemcached实战、Spring集成实战、集群配置实战等内容,风哥教程参考Memcached官方文档Client Libraries等内容编写,适合DBA人员和运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。
Part01-基础概念与理论知识
1.1 Memcached数据库Java客户端概述
Java是Memcached最常用的客户端语言之一,有多种成熟的客户端库可供选择。更多视频教程www.fgedu.net.cn
1.1.1 主流Java客户端对比
┌─────────────────┬─────────────────┬─────────────────┬─────────────────┐
│ 特性 │ Xmemcached │ SpyMemcached │ Memcached-Java │
│ │ │ │ Client │
├─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ 一致性哈希 │ 支持 │ 支持 │ 不支持 │
│ 连接池 │ 支持 │ 内置 │ 不支持 │
│ 异步操作 │ 支持 │ 支持 │ 不支持 │
│ 集群支持 │ 支持 │ 支持 │ 有限支持 │
│ 性能 │ 高 │ 高 │ 中 │
│ 维护状态 │ 活跃 │ 维护中 │ 停止维护 │
│ 文档完善度 │ 好 │ 好 │ 差 │
│ Spring集成 │ 支持 │ 支持 │ 不支持 │
└─────────────────┴─────────────────┴─────────────────┴─────────────────┘
# 推荐选择
1. Xmemcached(推荐)
– 功能最完善
– 性能优秀
– 持续维护
– 文档完善
2. SpyMemcached
– 稳定可靠
– 异步性能好
– 社区活跃
3. Memcached-Java-Client
– 不推荐使用
– 已停止维护
1.1.2 客户端架构
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
│ (Business Logic) │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Memcached客户端 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 核心组件 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 连接管理 │ │ 命令执行 │ │ 序列化 │ │ │
│ │ │ (NIO) │ │ (Sync/Async)│ │ (Transcoder)│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 分布式组件 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 一致性哈希 │ │ 故障转移 │ │ 负载均衡 │ │ │
│ │ │ (Ketama) │ │ (Failover) │ │ (Balance) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Memcached服务器集群 │
│ Server1 Server2 Server3 Server4 │
└─────────────────────────────────────────────────────────────┘
1.2 Xmemcached客户端介绍
Xmemcached是一个高性能的Java Memcached客户端,学习交流加群风哥微信: itpux-com
1.2.1 Xmemcached特性
# 1. 高性能
– 基于NIO实现
– 支持连接池
– 高并发处理能力
# 2. 分布式支持
– 一致性哈希(Ketama)
– 虚拟节点支持
– 动态节点管理
# 3. 功能完善
– 同步/异步操作
– 批量操作
– CAS操作
– 统计信息获取
# 4. 易于集成
– Spring集成
– Spring Boot集成
– 配置灵活
# 5. 可靠性
– 自动重连
– 故障转移
– 超时控制
# Maven依赖
1.2.2 Xmemcached核心API
# MemcachedClient接口
public interface MemcachedClient {
// 存储操作
void set(String key, int exp, Object value);
// 获取操作
// 删除操作
boolean delete(String key);
// 增减操作
long incr(String key, long delta);
long decr(String key, long delta);
// CAS操作
// 异步操作
// 批量操作
// 统计信息
Map
}
# 常用操作示例
// 存储
client.set(“fgedu_user:1001”, 3600, userObject);
// 获取
User user = client.get(“fgedu_user:1001”);
// 删除
client.delete(“fgedu_user:1001”);
// 自增
long newValue = client.incr(“fgedu_counter”, 1);
1.3 SpyMemcached客户端介绍
1.3.1 SpyMemcached特性
# 1. 异步设计
– 所有操作默认异步
– 基于Future模式
– 高并发性能
# 2. 连接管理
– 自动重连
– 连接池管理
– 节点故障处理
# 3. 序列化
– 支持多种序列化方式
– 自定义Transcoder
# Maven依赖
# 基本使用
MemcachedClient client = new MemcachedClient(
AddrUtil.getAddresses(“192.168.1.101:11211 192.168.1.102:11211”)
);
// 异步存储
Future
// 同步获取
Object value = client.get(“fgedu_key”);
// 异步获取
Future
1.3.2 SpyMemcached核心API
# MemcachedClient接口
public class MemcachedClient {
// 存储操作(异步)
Future
Future
Future
// 获取操作
Object get(String key);
Future
// 删除操作
Future
// 增减操作
long incr(String key, int by);
long decr(String key, int by);
// CAS操作
CASValue
// 统计信息
Map
}
# CAS操作示例
// 获取带CAS标识的值
CASValue
Part02-生产环境规划与建议
2.1 Memcached数据库依赖规划
合理的依赖管理是项目稳定运行的基础。学习交流加群风哥QQ113257174
2.1.1 Maven依赖配置
# pom.xml配置
# 版本选择建议
– Xmemcached: 2.4.x(稳定版本)
– SpyMemcached: 2.12.x(稳定版本)
– Spring Boot: 3.x(最新稳定版)
2.1.2 Gradle依赖配置
# build.gradle配置
plugins {
id ‘java’
id ‘org.springframework.boot’ version ‘3.2.0’
}
repositories {
mavenCentral()
}
dependencies {
// Xmemcached(推荐)
implementation ‘com.googlecode.xmemcached:xmemcached:2.4.7’
// 或者使用SpyMemcached
// implementation ‘net.spy:spymemcached:2.12.3’
// Spring Boot
implementation ‘org.springframework.boot:spring-boot-starter’
// Jackson序列化
implementation ‘com.fasterxml.jackson.core:jackson-databind:2.15.0’
// 日志
implementation ‘org.slf4j:slf4j-api:2.0.9’
// 测试
testImplementation ‘org.springframework.boot:spring-boot-starter-test’
}
java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
2.2 Memcached数据库连接池规划
2.2.1 连接池配置建议
# Xmemcached连接池配置
# 连接池大小
– 单机应用:连接池大小 = CPU核心数 * 2
– 分布式应用:连接池大小 = CPU核心数 * 4
# 连接参数
– 连接超时:1000-5000ms
– 操作超时:3000-10000ms
– 空闲超时:30000-60000ms
# 配置示例
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(“192.168.1.101:11211,192.168.1.102:11211”)
);
// 连接池大小
builder.setConnectionPoolSize(5);
// 连接超时
builder.setConnectTimeout(3000);
// 操作超时
builder.setOpTimeout(5000);
// 启用一致性哈希
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
// 启用失败模式
builder.setFailureMode(true);
MemcachedClient client = builder.build();
# 不同场景的连接池配置
# 小型应用(QPS < 1000)
builder.setConnectionPoolSize(3);
builder.setOpTimeout(5000);
# 中型应用(QPS 1000-10000)
builder.setConnectionPoolSize(5);
builder.setOpTimeout(3000);
# 大型应用(QPS > 10000)
builder.setConnectionPoolSize(10);
builder.setOpTimeout(2000);
2.2.2 连接池监控
# Xmemcached连接池状态
public class MemcachedMonitor {
private MemcachedClient client;
public void printStats() {
try {
// 获取所有服务器统计信息
Map
client.getStats();
for (Map.Entry
stats.entrySet()) {
System.out.println(“Server: ” + entry.getKey());
Map
// 关键指标
System.out.println(” 当前连接数: ” +
serverStats.get(“curr_connections”));
System.out.println(” 总连接数: ” +
serverStats.get(“total_connections”));
System.out.println(” 当前Item数: ” +
serverStats.get(“curr_items”));
System.out.println(” 内存使用: ” +
serverStats.get(“bytes”));
System.out.println(” 命中次数: ” +
serverStats.get(“get_hits”));
System.out.println(” 未命中次数: ” +
serverStats.get(“get_misses”));
// 计算命中率
long hits = Long.parseLong(serverStats.get(“get_hits”));
long misses = Long.parseLong(serverStats.get(“get_misses”));
double hitRate = (double) hits / (hits + misses) * 100;
System.out.println(” 命中率: ” + String.format(“%.2f”, hitRate) + “%”);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.3 Memcached数据库配置规划
2.3.1 配置文件规划
# application.yml配置(Spring Boot)
memcached:
servers:
– host: 192.168.1.101
port: 11211
– host: 192.168.1.102
port: 11211
– host: 192.168.1.103
port: 11211
– host: 192.168.1.104
port: 11211
pool:
size: 5
timeout:
connect: 3000
op: 5000
session:
locator: ketama
failure:
mode: true
# 配置类
@Configuration
@ConfigurationProperties(prefix = “memcached”)
public class MemcachedProperties {
private List
private PoolConfig pool;
private TimeoutConfig timeout;
private String sessionLocator = “ketama”;
private boolean failureMode = true;
// getters and setters
}
# ServerConfig
public class ServerConfig {
private String host;
private int port = 11211;
// getters and setters
}
2.3.2 配置参数说明
# 服务器配置
servers: Memcached服务器列表
host: 服务器IP地址
port: 服务器端口(默认11211)
# 连接池配置
pool:
size: 连接池大小(默认5)
# 超时配置
timeout:
connect: 连接超时(毫秒,默认3000)
op: 操作超时(毫秒,默认5000)
# 分布策略
session:
locator: 分布策略
– ketama: 一致性哈希(推荐)
– array: 简单取模
# 故障处理
failure:
mode: 是否启用失败模式(默认true)
– true: 节点故障时自动切换
– false: 节点故障时抛出异常
# 其他配置
– sanitizeKeys: 是否清理Key中的特殊字符
– commandFactory: 命令工厂(默认TextCommandFactory)
– transcoder: 序列化器(默认SerializingTranscoder)
Part03-生产环境项目实施方案
3.1 Memcached数据库Xmemcached实战
3.1.1 基础配置实战
# 创建配置类
package com.fgedu.cache.config;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import net.rubyeye.xmemcached.command.TextCommandFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MemcachedConfig {
@Value(“${memcached.servers}”)
private String servers;
@Value(“${memcached.pool.size:5}”)
private int poolSize;
@Value(“${memcached.timeout.connect:3000}”)
private long connectTimeout;
@Value(“${memcached.timeout.op:5000}”)
private long opTimeout;
@Bean
public MemcachedClient memcachedClient() throws Exception {
// 创建Builder
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(servers)
);
// 配置连接池
builder.setConnectionPoolSize(poolSize);
// 配置超时
builder.setConnectTimeout(connectTimeout);
builder.setOpTimeout(opTimeout);
// 启用一致性哈希
builder.setSessionLocator(
new net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator()
);
// 设置命令工厂
builder.setCommandFactory(new TextCommandFactory());
// 启用失败模式
builder.setFailureMode(true);
return builder.build();
}
}
# application.yml
memcached:
servers: 192.168.1.101:11211,192.168.1.102:11211,192.168.1.103:11211,192.168.1.104:11211
pool:
size: 5
timeout:
connect: 3000
op: 5000
3.1.2 基本操作实战
package com.fgedu.cache.service;
import net.rubyeye.xmemcached.MemcachedClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class MemcachedService {
@Autowired
private MemcachedClient memcachedClient;
/**
* 存储数据
*/
public void set(String key, int exp, Object value) {
try {
memcachedClient.set(key, exp, value);
System.out.println(“存储成功: ” + key);
} catch (Exception e) {
System.err.println(“存储失败: ” + e.getMessage());
}
}
/**
* 获取数据
*/
public
try {
T value = memcachedClient.get(key);
if (value != null) {
System.out.println(“获取成功: ” + key);
} else {
System.out.println(“数据不存在: ” + key);
}
return value;
} catch (Exception e) {
System.err.println(“获取失败: ” + e.getMessage());
return null;
}
}
/**
* 删除数据
*/
public boolean delete(String key) {
try {
boolean result = memcachedClient.delete(key);
System.out.println(“删除” + (result ? “成功” : “失败”) + “: ” + key);
return result;
} catch (Exception e) {
System.err.println(“删除失败: ” + e.getMessage());
return false;
}
}
/**
* 自增操作
*/
public long incr(String key, long delta) {
try {
long result = memcachedClient.incr(key, delta);
System.out.println(“自增结果: ” + result);
return result;
} catch (Exception e) {
System.err.println(“自增失败: ” + e.getMessage());
return -1;
}
}
/**
* CAS操作
*/
public
try {
// 获取当前值和CAS标识
net.rubyeye.xmemcached.CASValue
memcachedClient.gets(key);
if (casValue == null) {
System.out.println(“数据不存在,无法CAS”);
return false;
}
// 尝试更新
boolean result = memcachedClient.cas(key, exp, newValue,
casValue.getCas());
System.out.println(“CAS” + (result ? “成功” : “失败”));
return result;
} catch (Exception e) {
System.err.println(“CAS失败: ” + e.getMessage());
return false;
}
}
}
# 使用示例
@Service
public class UserService {
@Autowired
private MemcachedService cacheService;
public void cacheUser() {
// 存储用户
User user = new User(1001, “fgedu”, “fgedu@fgedu.net.cn”);
cacheService.set(“fgedu_user:1001”, 3600, user);
// 获取用户
User cachedUser = cacheService.get(“fgedu_user:1001”);
System.out.println(“用户: ” + cachedUser);
// 更新用户(CAS)
cachedUser.setEmail(“newemail@fgedu.net.cn”);
cacheService.cas(“fgedu_user:1001”, 3600, cachedUser);
// 删除用户
cacheService.delete(“fgedu_user:1001”);
}
}
3.2 Memcached数据库Spring集成实战
3.2.1 Spring Boot集成配置
# 1. 添加依赖
# 2. 配置类
package com.fgedu.cache.config;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(MemcachedProperties.class)
public class MemcachedAutoConfiguration {
@Bean(destroyMethod = “shutdown”)
public MemcachedClient memcachedClient(MemcachedProperties properties)
throws Exception {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(properties.getServers())
);
builder.setConnectionPoolSize(properties.getPool().getSize());
builder.setConnectTimeout(properties.getTimeout().getConnect());
builder.setOpTimeout(properties.getTimeout().getOp());
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
builder.setFailureMode(properties.isFailureMode());
return builder.build();
}
}
# 3. 属性配置类
package com.fgedu.cache.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = “memcached”)
public class MemcachedProperties {
private String servers;
private PoolConfig pool = new PoolConfig();
private TimeoutConfig timeout = new TimeoutConfig();
private boolean failureMode = true;
// getters and setters
public static class PoolConfig {
private int size = 5;
// getter and setter
}
public static class TimeoutConfig {
private long connect = 3000;
private long op = 5000;
// getters and setters
}
}
3.2.2 缓存抽象层实现
package com.fgedu.cache.service;
import net.rubyeye.xmemcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
public class CacheService {
private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
@Autowired
private MemcachedClient memcachedClient;
// 默认过期时间(1小时)
private static final int DEFAULT_EXPIRE = 3600;
/**
* 存储数据
*/
public
return put(key, DEFAULT_EXPIRE, value);
}
public
try {
memcachedClient.set(key, expire, value);
logger.debug(“Cache put: {}”, key);
return true;
} catch (Exception e) {
logger.error(“Cache put error: {}”, key, e);
return false;
}
}
/**
* 获取数据
*/
public
try {
T value = memcachedClient.get(key);
logger.debug(“Cache get: {}, hit: {}”, key, value != null);
return value;
} catch (Exception e) {
logger.error(“Cache get error: {}”, key, e);
return null;
}
}
/**
* 获取数据,不存在则加载
*/
public
T value = get(key);
if (value == null) {
value = loader.load();
if (value != null) {
put(key, expire, value);
}
}
return value;
}
/**
* 删除数据
*/
public boolean delete(String key) {
try {
boolean result = memcachedClient.delete(key);
logger.debug(“Cache delete: {}, result: {}”, key, result);
return result;
} catch (Exception e) {
logger.error(“Cache delete error: {}”, key, e);
return false;
}
}
/**
* 批量获取
*/
public
try {
Map
logger.debug(“Cache multiGet: {} keys, {} hits”,
keys.size(), result.size());
return result;
} catch (Exception e) {
logger.error(“Cache multiGet error”, e);
return Collections.emptyMap();
}
}
/**
* 异步存储
*/
public
try {
memcachedClient.setWithNoReply(key, expire, value);
logger.debug(“Cache asyncPut: {}”, key);
} catch (Exception e) {
logger.error(“Cache asyncPut error: {}”, key, e);
}
}
/**
* 缓存加载器接口
*/
@FunctionalInterface
public interface CacheLoader
T load();
}
}
3.3 Memcached数据库集群配置实战
3.3.1 集群配置实现
package com.fgedu.cache.cluster;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.auth.AuthInfo;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.*;
@Component
public class MemcachedClusterConfig {
// 集群节点配置
private static final String CLUSTER_NODES =
“192.168.1.101:11211 ” +
“192.168.1.102:11211 ” +
“192.168.1.103:11211 ” +
“192.168.1.104:11211”;
// 虚拟节点数
private static final int VIRTUAL_NODES = 160;
/**
* 创建集群客户端
*/
public MemcachedClient createClusterClient() throws Exception {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(CLUSTER_NODES)
);
// 一致性哈希
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
// 连接池配置
builder.setConnectionPoolSize(5);
// 超时配置
builder.setConnectTimeout(3000);
builder.setOpTimeout(5000);
// 失败模式
builder.setFailureMode(true);
// 健康检查
builder.setHealSessionInterval(5000);
return builder.build();
}
/**
* 动态添加节点
*/
public void addNode(MemcachedClient client, String host, int port)
throws Exception {
InetSocketAddress address = new InetSocketAddress(host, port);
client.addServer(address);
System.out.println(“添加节点: ” + host + “:” + port);
}
/**
* 动态移除节点
*/
public void removeNode(MemcachedClient client, String host, int port)
throws Exception {
InetSocketAddress address = new InetSocketAddress(host, port);
client.removeServer(address);
System.out.println(“移除节点: ” + host + “:” + port);
}
/**
* 获取集群状态
*/
public void printClusterStatus(MemcachedClient client) throws Exception {
Map
System.out.println(“集群状态:”);
System.out.println(“====================================”);
for (Map.Entry
stats.entrySet()) {
InetSocketAddress addr = entry.getKey();
Map
System.out.println(“节点: ” + addr.getHostString() + “:” + addr.getPort());
System.out.println(” 版本: ” + serverStats.get(“version”));
System.out.println(” 运行时间: ” + serverStats.get(“uptime”) + “秒”);
System.out.println(” 当前Item数: ” + serverStats.get(“curr_items”));
System.out.println(” 内存使用: ” + formatBytes(
Long.parseLong(serverStats.get(“bytes”))));
System.out.println(” 命中率: ” + calculateHitRate(serverStats) + “%”);
System.out.println();
}
}
private String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return (bytes / 1024) + " KB";
if (bytes < 1024 * 1024 * 1024) return (bytes / 1024 / 1024) + " MB";
return (bytes / 1024 / 1024 / 1024) + " GB";
}
private String calculateHitRate(Map
long hits = Long.parseLong(stats.getOrDefault(“get_hits”, “0”));
long misses = Long.parseLong(stats.getOrDefault(“get_misses”, “0”));
if (hits + misses == 0) return “0.00”;
return String.format(“%.2f”, (double) hits / (hits + misses) * 100);
}
}
3.3.2 故障转移配置
package com.fgedu.cache.failover;
import net.rubyeye.xmemcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
@Component
public class FailoverManager {
private static final Logger logger = LoggerFactory.getLogger(FailoverManager.class);
private final MemcachedClient client;
private final ScheduledExecutorService scheduler;
private final Set
ConcurrentHashMap.newKeySet();
public FailoverManager(MemcachedClient client) {
this.client = client;
this.scheduler = Executors.newScheduledThreadPool(1);
startHealthCheck();
}
/**
* 启动健康检查
*/
private void startHealthCheck() {
scheduler.scheduleAtFixedRate(() -> {
try {
checkNodes();
} catch (Exception e) {
logger.error(“健康检查失败”, e);
}
}, 0, 10, TimeUnit.SECONDS);
}
/**
* 检查节点健康状态
*/
private void checkNodes() {
Map
for (InetSocketAddress addr : stats.keySet()) {
boolean isHealthy = checkNodeHealth(addr);
if (!isHealthy && !failedNodes.contains(addr)) {
failedNodes.add(addr);
logger.warn(“节点故障: {}”, addr);
onNodeFailure(addr);
} else if (isHealthy && failedNodes.contains(addr)) {
failedNodes.remove(addr);
logger.info(“节点恢复: {}”, addr);
onNodeRecovery(addr);
}
}
}
/**
* 检查单个节点健康状态
*/
private boolean checkNodeHealth(InetSocketAddress addr) {
try {
Map
return stats != null && !stats.isEmpty();
} catch (Exception e) {
return false;
}
}
/**
* 节点故障处理
*/
private void onNodeFailure(InetSocketAddress addr) {
// 发送告警
sendAlert(“节点故障: ” + addr);
// 可以在这里实现自动摘除逻辑
}
/**
* 节点恢复处理
*/
private void onNodeRecovery(InetSocketAddress addr) {
// 发送通知
sendAlert(“节点恢复: ” + addr);
// 预热节点
warmupNode(addr);
}
/**
* 预热节点
*/
private void warmupNode(InetSocketAddress addr) {
// 加载热点数据到恢复的节点
logger.info(“预热节点: {}”, addr);
}
/**
* 发送告警
*/
private void sendAlert(String message) {
logger.warn(“告警: {}”, message);
// 实现邮件/短信告警
}
/**
* 关闭
*/
public void shutdown() {
scheduler.shutdown();
}
}
Part04-生产案例与实战讲解
4.1 Memcached数据库缓存服务封装案例
以下是一个完整的缓存服务封装案例。更多学习教程公众号风哥教程itpux_com
4.1.1 缓存服务完整实现
package com.fgedu.cache.service;
import net.rubyeye.xmemcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;
@Service
public class DistributedCacheService {
private static final Logger logger =
LoggerFactory.getLogger(DistributedCacheService.class);
@Autowired
private MemcachedClient memcachedClient;
// Key前缀
private static final String KEY_PREFIX = “fgedu_”;
// 默认过期时间
private static final int DEFAULT_EXPIRE = 3600;
/**
* 构建缓存Key
*/
private String buildKey(String namespace, String key) {
return KEY_PREFIX + namespace + “:” + key;
}
/**
* 存储数据
*/
public
return set(namespace, key, DEFAULT_EXPIRE, value);
}
public
String cacheKey = buildKey(namespace, key);
try {
memcachedClient.set(cacheKey, expire, value);
logger.debug(“Set cache: {}”, cacheKey);
return true;
} catch (Exception e) {
logger.error(“Set cache error: {}”, cacheKey, e);
return false;
}
}
/**
* 获取数据
*/
public
String cacheKey = buildKey(namespace, key);
try {
T value = memcachedClient.get(cacheKey);
logger.debug(“Get cache: {}, hit: {}”, cacheKey, value != null);
return value;
} catch (Exception e) {
logger.error(“Get cache error: {}”, cacheKey, e);
return null;
}
}
/**
* 获取或加载(缓存穿透保护)
*/
public
CacheLoader
String cacheKey = buildKey(namespace, key);
try {
T value = memcachedClient.get(cacheKey);
if (value != null) {
logger.debug(“Cache hit: {}”, cacheKey);
return value;
}
// 缓存未命中,加载数据
logger.debug(“Cache miss: {}”, cacheKey);
value = loader.load();
if (value != null) {
memcachedClient.set(cacheKey, expire, value);
} else {
// 防止缓存穿透,存储空值
memcachedClient.set(cacheKey, 60, new NullValue());
}
return value;
} catch (Exception e) {
logger.error(“Get or load error: {}”, cacheKey, e);
return loader.load();
}
}
/**
* 删除数据
*/
public boolean delete(String namespace, String key) {
String cacheKey = buildKey(namespace, key);
try {
boolean result = memcachedClient.delete(cacheKey);
logger.debug(“Delete cache: {}, result: {}”, cacheKey, result);
return result;
} catch (Exception e) {
logger.error(“Delete cache error: {}”, cacheKey, e);
return false;
}
}
/**
* 批量获取
*/
public
Collection
List
for (String key : keys) {
cacheKeys.add(buildKey(namespace, key));
}
try {
Map
// 转换Key
Map
for (Map.Entry
String originalKey = entry.getKey()
.substring((KEY_PREFIX + namespace + “:”).length());
finalResult.put(originalKey, entry.getValue());
}
return finalResult;
} catch (Exception e) {
logger.error(“Multi get error”, e);
return Collections.emptyMap();
}
}
/**
* 计数器
*/
public long increment(String namespace, String key, long delta) {
String cacheKey = buildKey(namespace, key);
try {
return memcachedClient.incr(cacheKey, delta, 0);
} catch (Exception e) {
logger.error(“Increment error: {}”, cacheKey, e);
return -1;
}
}
/**
* 缓存加载器接口
*/
@FunctionalInterface
public interface CacheLoader
T load();
}
/**
* 空值标记
*/
private static class NullValue implements Serializable {
private static final long serialVersionUID = 1L;
}
}
4.1.2 使用示例
package com.fgedu.service;
import com.fgedu.cache.service.DistributedCacheService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class UserService {
@Autowired
private DistributedCacheService cacheService;
@Autowired
private UserRepository userRepository;
/**
* 获取用户(带缓存)
*/
public User getUser(Long userId) {
return cacheService.getOrLoad(“user”, userId.toString(), 3600,
() -> userRepository.findById(userId));
}
/**
* 更新用户(更新缓存)
*/
public void updateUser(User user) {
userRepository.save(user);
cacheService.set(“user”, user.getId().toString(), 3600, user);
}
/**
* 删除用户(清除缓存)
*/
public void deleteUser(Long userId) {
userRepository.deleteById(userId);
cacheService.delete(“user”, userId.toString());
}
/**
* 批量获取用户
*/
public Map
List
for (Long id : userIds) {
keys.add(id.toString());
}
return cacheService.multiGet(“user”, keys);
}
/**
* 用户访问计数
*/
public long incrementVisitCount(Long userId) {
return cacheService.increment(“visit”, userId.toString(), 1);
}
}
4.2 Memcached数据库Session缓存案例
4.2.1 Session存储实现
package com.fgedu.session;
import net.rubyeye.xmemcached.MemcachedClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.session.Session;
import org.springframework.session.SessionRepository;
import org.springframework.stereotype.Repository;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
@Repository
public class MemcachedSessionRepository implements SessionRepository
@Autowired
private MemcachedClient memcachedClient;
private static final String SESSION_PREFIX = “fgedu_session:”;
private static final int DEFAULT_MAX_INACTIVE_INTERVAL = 1800; // 30分钟
@Override
public Session createSession() {
MemcachedSession session = new MemcachedSession();
session.setMaxInactiveInterval(Duration.ofSeconds(DEFAULT_MAX_INACTIVE_INTERVAL));
return session;
}
@Override
public void save(Session session) {
String key = SESSION_PREFIX + session.getId();
int expire = (int) session.getMaxInactiveInterval().getSeconds();
try {
Map
sessionData.put(“id”, session.getId());
sessionData.put(“creationTime”, session.getCreationTime().toEpochMilli());
sessionData.put(“lastAccessedTime”, session.getLastAccessedTime().toEpochMilli());
sessionData.put(“maxInactiveInterval”, session.getMaxInactiveInterval().getSeconds());
sessionData.put(“attributes”, new HashMap<>(session.getAttributeNames()));
memcachedClient.set(key, expire, sessionData);
} catch (Exception e) {
throw new RuntimeException(“保存Session失败”, e);
}
}
@Override
public Session findById(String id) {
String key = SESSION_PREFIX + id;
try {
Map
if (sessionData == null) {
return null;
}
MemcachedSession session = new MemcachedSession(id);
session.setCreationTime(Instant.ofEpochMilli(
(Long) sessionData.get(“creationTime”)));
session.setLastAccessedTime(Instant.ofEpochMilli(
(Long) sessionData.get(“lastAccessedTime”)));
session.setMaxInactiveInterval(Duration.ofSeconds(
(Long) sessionData.get(“maxInactiveInterval”)));
@SuppressWarnings(“unchecked”)
Map
(Map
for (Map.Entry
session.setAttribute(entry.getKey(), entry.getValue());
}
return session;
} catch (Exception e) {
throw new RuntimeException(“获取Session失败”, e);
}
}
@Override
public void deleteById(String id) {
String key = SESSION_PREFIX + id;
try {
memcachedClient.delete(key);
} catch (Exception e) {
throw new RuntimeException(“删除Session失败”, e);
}
}
}
# MemcachedSession类
public class MemcachedSession implements Session {
private String id;
private Instant creationTime;
private Instant lastAccessedTime;
private Duration maxInactiveInterval;
private Map
public MemcachedSession() {
this.id = UUID.randomUUID().toString();
this.creationTime = Instant.now();
this.lastAccessedTime = this.creationTime;
}
public MemcachedSession(String id) {
this.id = id;
}
// 实现Session接口方法…
}
4.3 Memcached数据库数据缓存案例
4.3.1 商品缓存案例
package com.fgedu.product.service;
import com.fgedu.cache.service.DistributedCacheService;
import com.fgedu.product.entity.Product;
import com.fgedu.product.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;
@Service
public class ProductService {
@Autowired
private DistributedCacheService cacheService;
@Autowired
private ProductRepository productRepository;
// 缓存过期时间:5分钟
private static final int CACHE_EXPIRE = 300;
// 热点商品缓存时间:30分钟
private static final int HOT_CACHE_EXPIRE = 1800;
// 热点商品ID集合
private Set
/**
* 获取商品信息
*/
public Product getProduct(Long productId) {
int expire = hotProductIds.contains(productId) ?
HOT_CACHE_EXPIRE : CACHE_EXPIRE;
return cacheService.getOrLoad(“product”, productId.toString(), expire,
() -> productRepository.findById(productId));
}
/**
* 更新商品信息
*/
public void updateProduct(Product product) {
productRepository.save(product);
// 更新缓存
int expire = hotProductIds.contains(product.getId()) ?
HOT_CACHE_EXPIRE : CACHE_EXPIRE;
cacheService.set(“product”, product.getId().toString(), expire, product);
}
/**
* 删除商品
*/
public void deleteProduct(Long productId) {
productRepository.deleteById(productId);
// 清除缓存
cacheService.delete(“product”, productId.toString());
hotProductIds.remove(productId);
}
/**
* 批量获取商品
*/
public Map
List
for (Long id : productIds) {
keys.add(id.toString());
}
Map
// 查找未命中的商品
List
for (Long id : productIds) {
if (!cachedProducts.containsKey(id.toString())) {
missedIds.add(id);
}
}
// 加载未命中的商品
if (!missedIds.isEmpty()) {
List
for (Product product : products) {
cachedProducts.put(product.getId().toString(), product);
// 写入缓存
int expire = hotProductIds.contains(product.getId()) ?
HOT_CACHE_EXPIRE : CACHE_EXPIRE;
cacheService.set(“product”, product.getId().toS
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
