1. 首页 > Memcached教程 > 正文

Memcached教程FG007-Memcached Java客户端集成实战

本文档风哥主要介绍Memcached数据库Java客户端集成相关知识,包括Memcached数据库Java客户端概述、Xmemcached客户端介绍、SpyMemcached客户端介绍、依赖规划、连接池规划、Xmemcached实战、Spring集成实战、集群配置实战等内容,风哥教程参考Memcached官方文档Client Libraries等内容编写,适合DBA人员和运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。

Part01-基础概念与理论知识

1.1 Memcached数据库Java客户端概述

Java是Memcached最常用的客户端语言之一,有多种成熟的客户端库可供选择。更多视频教程www.fgedu.net.cn

1.1.1 主流Java客户端对比

# Memcached Java客户端对比

┌─────────────────┬─────────────────┬─────────────────┬─────────────────┐
│ 特性 │ Xmemcached │ SpyMemcached │ Memcached-Java │
│ │ │ │ Client │
├─────────────────┼─────────────────┼─────────────────┼─────────────────┤
│ 一致性哈希 │ 支持 │ 支持 │ 不支持 │
│ 连接池 │ 支持 │ 内置 │ 不支持 │
│ 异步操作 │ 支持 │ 支持 │ 不支持 │
│ 集群支持 │ 支持 │ 支持 │ 有限支持 │
│ 性能 │ 高 │ 高 │ 中 │
│ 维护状态 │ 活跃 │ 维护中 │ 停止维护 │
│ 文档完善度 │ 好 │ 好 │ 差 │
│ Spring集成 │ 支持 │ 支持 │ 不支持 │
└─────────────────┴─────────────────┴─────────────────┴─────────────────┘

# 推荐选择
1. Xmemcached(推荐)
– 功能最完善
– 性能优秀
– 持续维护
– 文档完善

2. SpyMemcached
– 稳定可靠
– 异步性能好
– 社区活跃

3. Memcached-Java-Client
– 不推荐使用
– 已停止维护

1.1.2 客户端架构

# Java客户端架构

┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
│ (Business Logic) │
└─────────────────────┬───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Memcached客户端 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 核心组件 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 连接管理 │ │ 命令执行 │ │ 序列化 │ │ │
│ │ │ (NIO) │ │ (Sync/Async)│ │ (Transcoder)│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 分布式组件 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 一致性哈希 │ │ 故障转移 │ │ 负载均衡 │ │ │
│ │ │ (Ketama) │ │ (Failover) │ │ (Balance) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────┬───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Memcached服务器集群 │
│ Server1 Server2 Server3 Server4 │
└─────────────────────────────────────────────────────────────┘

1.2 Xmemcached客户端介绍

Xmemcached是一个高性能的Java Memcached客户端,学习交流加群风哥微信: itpux-com

1.2.1 Xmemcached特性

# Xmemcached主要特性

# 1. 高性能
– 基于NIO实现
– 支持连接池
– 高并发处理能力

# 2. 分布式支持
– 一致性哈希(Ketama)
– 虚拟节点支持
– 动态节点管理

# 3. 功能完善
– 同步/异步操作
– 批量操作
– CAS操作
– 统计信息获取

# 4. 易于集成
– Spring集成
– Spring Boot集成
– 配置灵活

# 5. 可靠性
– 自动重连
– 故障转移
– 超时控制

# Maven依赖

com.googlecode.xmemcached
xmemcached
2.4.7

1.2.2 Xmemcached核心API

# Xmemcached核心API

# MemcachedClient接口
public interface MemcachedClient {
// 存储操作
void set(String key, int exp, Object value);
boolean set(String key, int exp, T value, Transcoder transcoder);

// 获取操作
T get(String key);
T get(String key, Transcoder transcoder);

// 删除操作
boolean delete(String key);

// 增减操作
long incr(String key, long delta);
long decr(String key, long delta);

// CAS操作
boolean cas(String key, int exp, T value, long cas);

// 异步操作
GetsResponse gets(String key);
Future asyncSet(String key, int exp, T value);
Future asyncGet(String key);

// 批量操作
Map get(Collection keyCollection);

// 统计信息
Map> getStats();
}

# 常用操作示例
// 存储
client.set(“fgedu_user:1001”, 3600, userObject);

// 获取
User user = client.get(“fgedu_user:1001”);

// 删除
client.delete(“fgedu_user:1001”);

// 自增
long newValue = client.incr(“fgedu_counter”, 1);

1.3 SpyMemcached客户端介绍

1.3.1 SpyMemcached特性

# SpyMemcached主要特性

# 1. 异步设计
– 所有操作默认异步
– 基于Future模式
– 高并发性能

# 2. 连接管理
– 自动重连
– 连接池管理
– 节点故障处理

# 3. 序列化
– 支持多种序列化方式
– 自定义Transcoder

# Maven依赖

net.spy
spymemcached
2.12.3

# 基本使用
MemcachedClient client = new MemcachedClient(
AddrUtil.getAddresses(“192.168.1.101:11211 192.168.1.102:11211”)
);

// 异步存储
Future future = client.set(“fgedu_key”, 3600, “value”);

// 同步获取
Object value = client.get(“fgedu_key”);

// 异步获取
Future asyncFuture = client.asyncGet(“fgedu_key”);
Object asyncValue = asyncFuture.get();

1.3.2 SpyMemcached核心API

# SpyMemcached核心API

# MemcachedClient接口
public class MemcachedClient {
// 存储操作(异步)
Future set(String key, int exp, Object value);
Future add(String key, int exp, Object value);
Future replace(String key, int exp, Object value);

// 获取操作
Object get(String key);
Future asyncGet(String key);
Map getBulk(Collection keys);

// 删除操作
Future delete(String key);

// 增减操作
long incr(String key, int by);
long decr(String key, int by);

// CAS操作
CASValue gets(String key);
CASResponse cas(String key, long casId, int exp, Object value);

// 统计信息
Map> getStats();
}

# CAS操作示例
// 获取带CAS标识的值
CASValue casValue = client.gets(“fgedu_balance:1001”);
if (casValue != null) {
// 尝试更新
CASResponse response = client.cas(“fgedu_balance:1001”,
casValue.getCas(), 3600, newValue);
if (response == CASResponse.OK) {
System.out.println(“更新成功”);
} else {
System.out.println(“更新失败,值已被修改”);
}
}

Part02-生产环境规划与建议

2.1 Memcached数据库依赖规划

合理的依赖管理是项目稳定运行的基础。学习交流加群风哥QQ113257174

2.1.1 Maven依赖配置

# Maven依赖配置

# pom.xml配置



com.googlecode.xmemcached
xmemcached
2.4.7




org.springframework.boot
spring-boot-starter
3.2.0



com.fasterxml.jackson.core
jackson-databind
2.15.0



org.slf4j
slf4j-api
2.0.9

# 版本选择建议
– Xmemcached: 2.4.x(稳定版本)
– SpyMemcached: 2.12.x(稳定版本)
– Spring Boot: 3.x(最新稳定版)

2.1.2 Gradle依赖配置

# Gradle依赖配置

# build.gradle配置
plugins {
id ‘java’
id ‘org.springframework.boot’ version ‘3.2.0’
}

repositories {
mavenCentral()
}

dependencies {
// Xmemcached(推荐)
implementation ‘com.googlecode.xmemcached:xmemcached:2.4.7’

// 或者使用SpyMemcached
// implementation ‘net.spy:spymemcached:2.12.3’

// Spring Boot
implementation ‘org.springframework.boot:spring-boot-starter’

// Jackson序列化
implementation ‘com.fasterxml.jackson.core:jackson-databind:2.15.0’

// 日志
implementation ‘org.slf4j:slf4j-api:2.0.9’

// 测试
testImplementation ‘org.springframework.boot:spring-boot-starter-test’
}

java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

2.2 Memcached数据库连接池规划

2.2.1 连接池配置建议

# 连接池配置建议

# Xmemcached连接池配置
# 连接池大小
– 单机应用:连接池大小 = CPU核心数 * 2
– 分布式应用:连接池大小 = CPU核心数 * 4

# 连接参数
– 连接超时:1000-5000ms
– 操作超时:3000-10000ms
– 空闲超时:30000-60000ms

# 配置示例
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(“192.168.1.101:11211,192.168.1.102:11211”)
);

// 连接池大小
builder.setConnectionPoolSize(5);

// 连接超时
builder.setConnectTimeout(3000);

// 操作超时
builder.setOpTimeout(5000);

// 启用一致性哈希
builder.setSessionLocator(new KetamaMemcachedSessionLocator());

// 启用失败模式
builder.setFailureMode(true);

MemcachedClient client = builder.build();

# 不同场景的连接池配置

# 小型应用(QPS < 1000) builder.setConnectionPoolSize(3); builder.setOpTimeout(5000); # 中型应用(QPS 1000-10000) builder.setConnectionPoolSize(5); builder.setOpTimeout(3000); # 大型应用(QPS > 10000)
builder.setConnectionPoolSize(10);
builder.setOpTimeout(2000);

2.2.2 连接池监控

# 连接池监控

# Xmemcached连接池状态
public class MemcachedMonitor {
private MemcachedClient client;

public void printStats() {
try {
// 获取所有服务器统计信息
Map> stats =
client.getStats();

for (Map.Entry> entry :
stats.entrySet()) {
System.out.println(“Server: ” + entry.getKey());
Map serverStats = entry.getValue();

// 关键指标
System.out.println(” 当前连接数: ” +
serverStats.get(“curr_connections”));
System.out.println(” 总连接数: ” +
serverStats.get(“total_connections”));
System.out.println(” 当前Item数: ” +
serverStats.get(“curr_items”));
System.out.println(” 内存使用: ” +
serverStats.get(“bytes”));
System.out.println(” 命中次数: ” +
serverStats.get(“get_hits”));
System.out.println(” 未命中次数: ” +
serverStats.get(“get_misses”));

// 计算命中率
long hits = Long.parseLong(serverStats.get(“get_hits”));
long misses = Long.parseLong(serverStats.get(“get_misses”));
double hitRate = (double) hits / (hits + misses) * 100;
System.out.println(” 命中率: ” + String.format(“%.2f”, hitRate) + “%”);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

2.3 Memcached数据库配置规划

2.3.1 配置文件规划

# 配置文件规划

# application.yml配置(Spring Boot)
memcached:
servers:
– host: 192.168.1.101
port: 11211
– host: 192.168.1.102
port: 11211
– host: 192.168.1.103
port: 11211
– host: 192.168.1.104
port: 11211
pool:
size: 5
timeout:
connect: 3000
op: 5000
session:
locator: ketama
failure:
mode: true

# 配置类
@Configuration
@ConfigurationProperties(prefix = “memcached”)
public class MemcachedProperties {
private List servers;
private PoolConfig pool;
private TimeoutConfig timeout;
private String sessionLocator = “ketama”;
private boolean failureMode = true;

// getters and setters
}

# ServerConfig
public class ServerConfig {
private String host;
private int port = 11211;

// getters and setters
}

2.3.2 配置参数说明

# 配置参数说明

# 服务器配置
servers: Memcached服务器列表
host: 服务器IP地址
port: 服务器端口(默认11211)

# 连接池配置
pool:
size: 连接池大小(默认5)

# 超时配置
timeout:
connect: 连接超时(毫秒,默认3000)
op: 操作超时(毫秒,默认5000)

# 分布策略
session:
locator: 分布策略
– ketama: 一致性哈希(推荐)
– array: 简单取模

# 故障处理
failure:
mode: 是否启用失败模式(默认true)
– true: 节点故障时自动切换
– false: 节点故障时抛出异常

# 其他配置
– sanitizeKeys: 是否清理Key中的特殊字符
– commandFactory: 命令工厂(默认TextCommandFactory)
– transcoder: 序列化器(默认SerializingTranscoder)

Part03-生产环境项目实施方案

3.1 Memcached数据库Xmemcached实战

3.1.1 基础配置实战

# Xmemcached基础配置

# 创建配置类
package com.fgedu.cache.config;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import net.rubyeye.xmemcached.command.TextCommandFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MemcachedConfig {

@Value(“${memcached.servers}”)
private String servers;

@Value(“${memcached.pool.size:5}”)
private int poolSize;

@Value(“${memcached.timeout.connect:3000}”)
private long connectTimeout;

@Value(“${memcached.timeout.op:5000}”)
private long opTimeout;

@Bean
public MemcachedClient memcachedClient() throws Exception {
// 创建Builder
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(servers)
);

// 配置连接池
builder.setConnectionPoolSize(poolSize);

// 配置超时
builder.setConnectTimeout(connectTimeout);
builder.setOpTimeout(opTimeout);

// 启用一致性哈希
builder.setSessionLocator(
new net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator()
);

// 设置命令工厂
builder.setCommandFactory(new TextCommandFactory());

// 启用失败模式
builder.setFailureMode(true);

return builder.build();
}
}

# application.yml
memcached:
servers: 192.168.1.101:11211,192.168.1.102:11211,192.168.1.103:11211,192.168.1.104:11211
pool:
size: 5
timeout:
connect: 3000
op: 5000

3.1.2 基本操作实战

# Xmemcached基本操作

package com.fgedu.cache.service;

import net.rubyeye.xmemcached.MemcachedClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

@Service
public class MemcachedService {

@Autowired
private MemcachedClient memcachedClient;

/**
* 存储数据
*/
public void set(String key, int exp, Object value) {
try {
memcachedClient.set(key, exp, value);
System.out.println(“存储成功: ” + key);
} catch (Exception e) {
System.err.println(“存储失败: ” + e.getMessage());
}
}

/**
* 获取数据
*/
public T get(String key) {
try {
T value = memcachedClient.get(key);
if (value != null) {
System.out.println(“获取成功: ” + key);
} else {
System.out.println(“数据不存在: ” + key);
}
return value;
} catch (Exception e) {
System.err.println(“获取失败: ” + e.getMessage());
return null;
}
}

/**
* 删除数据
*/
public boolean delete(String key) {
try {
boolean result = memcachedClient.delete(key);
System.out.println(“删除” + (result ? “成功” : “失败”) + “: ” + key);
return result;
} catch (Exception e) {
System.err.println(“删除失败: ” + e.getMessage());
return false;
}
}

/**
* 自增操作
*/
public long incr(String key, long delta) {
try {
long result = memcachedClient.incr(key, delta);
System.out.println(“自增结果: ” + result);
return result;
} catch (Exception e) {
System.err.println(“自增失败: ” + e.getMessage());
return -1;
}
}

/**
* CAS操作
*/
public boolean cas(String key, int exp, T newValue) {
try {
// 获取当前值和CAS标识
net.rubyeye.xmemcached.CASValue casValue =
memcachedClient.gets(key);

if (casValue == null) {
System.out.println(“数据不存在,无法CAS”);
return false;
}

// 尝试更新
boolean result = memcachedClient.cas(key, exp, newValue,
casValue.getCas());
System.out.println(“CAS” + (result ? “成功” : “失败”));
return result;
} catch (Exception e) {
System.err.println(“CAS失败: ” + e.getMessage());
return false;
}
}
}

# 使用示例
@Service
public class UserService {

@Autowired
private MemcachedService cacheService;

public void cacheUser() {
// 存储用户
User user = new User(1001, “fgedu”, “fgedu@fgedu.net.cn”);
cacheService.set(“fgedu_user:1001”, 3600, user);

// 获取用户
User cachedUser = cacheService.get(“fgedu_user:1001”);
System.out.println(“用户: ” + cachedUser);

// 更新用户(CAS)
cachedUser.setEmail(“newemail@fgedu.net.cn”);
cacheService.cas(“fgedu_user:1001”, 3600, cachedUser);

// 删除用户
cacheService.delete(“fgedu_user:1001”);
}
}

3.2 Memcached数据库Spring集成实战

3.2.1 Spring Boot集成配置

# Spring Boot集成配置

# 1. 添加依赖

com.googlecode.xmemcached
xmemcached
2.4.7

# 2. 配置类
package com.fgedu.cache.config;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(MemcachedProperties.class)
public class MemcachedAutoConfiguration {

@Bean(destroyMethod = “shutdown”)
public MemcachedClient memcachedClient(MemcachedProperties properties)
throws Exception {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(properties.getServers())
);

builder.setConnectionPoolSize(properties.getPool().getSize());
builder.setConnectTimeout(properties.getTimeout().getConnect());
builder.setOpTimeout(properties.getTimeout().getOp());
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
builder.setFailureMode(properties.isFailureMode());

return builder.build();
}
}

# 3. 属性配置类
package com.fgedu.cache.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = “memcached”)
public class MemcachedProperties {
private String servers;
private PoolConfig pool = new PoolConfig();
private TimeoutConfig timeout = new TimeoutConfig();
private boolean failureMode = true;

// getters and setters

public static class PoolConfig {
private int size = 5;
// getter and setter
}

public static class TimeoutConfig {
private long connect = 3000;
private long op = 5000;
// getters and setters
}
}

3.2.2 缓存抽象层实现

# 缓存抽象层实现

package com.fgedu.cache.service;

import net.rubyeye.xmemcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.TimeUnit;

@Service
public class CacheService {

private static final Logger logger = LoggerFactory.getLogger(CacheService.class);

@Autowired
private MemcachedClient memcachedClient;

// 默认过期时间(1小时)
private static final int DEFAULT_EXPIRE = 3600;

/**
* 存储数据
*/
public boolean put(String key, T value) {
return put(key, DEFAULT_EXPIRE, value);
}

public boolean put(String key, int expire, T value) {
try {
memcachedClient.set(key, expire, value);
logger.debug(“Cache put: {}”, key);
return true;
} catch (Exception e) {
logger.error(“Cache put error: {}”, key, e);
return false;
}
}

/**
* 获取数据
*/
public T get(String key) {
try {
T value = memcachedClient.get(key);
logger.debug(“Cache get: {}, hit: {}”, key, value != null);
return value;
} catch (Exception e) {
logger.error(“Cache get error: {}”, key, e);
return null;
}
}

/**
* 获取数据,不存在则加载
*/
public T getOrLoad(String key, int expire, CacheLoader loader) {
T value = get(key);
if (value == null) {
value = loader.load();
if (value != null) {
put(key, expire, value);
}
}
return value;
}

/**
* 删除数据
*/
public boolean delete(String key) {
try {
boolean result = memcachedClient.delete(key);
logger.debug(“Cache delete: {}, result: {}”, key, result);
return result;
} catch (Exception e) {
logger.error(“Cache delete error: {}”, key, e);
return false;
}
}

/**
* 批量获取
*/
public Map multiGet(Collection keys) {
try {
Map result = memcachedClient.get(keys);
logger.debug(“Cache multiGet: {} keys, {} hits”,
keys.size(), result.size());
return result;
} catch (Exception e) {
logger.error(“Cache multiGet error”, e);
return Collections.emptyMap();
}
}

/**
* 异步存储
*/
public void asyncPut(String key, int expire, T value) {
try {
memcachedClient.setWithNoReply(key, expire, value);
logger.debug(“Cache asyncPut: {}”, key);
} catch (Exception e) {
logger.error(“Cache asyncPut error: {}”, key, e);
}
}

/**
* 缓存加载器接口
*/
@FunctionalInterface
public interface CacheLoader {
T load();
}
}

3.3 Memcached数据库集群配置实战

3.3.1 集群配置实现

# 集群配置实现

package com.fgedu.cache.cluster;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator;
import net.rubyeye.xmemcached.auth.AuthInfo;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.*;

@Component
public class MemcachedClusterConfig {

// 集群节点配置
private static final String CLUSTER_NODES =
“192.168.1.101:11211 ” +
“192.168.1.102:11211 ” +
“192.168.1.103:11211 ” +
“192.168.1.104:11211”;

// 虚拟节点数
private static final int VIRTUAL_NODES = 160;

/**
* 创建集群客户端
*/
public MemcachedClient createClusterClient() throws Exception {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(CLUSTER_NODES)
);

// 一致性哈希
builder.setSessionLocator(new KetamaMemcachedSessionLocator());

// 连接池配置
builder.setConnectionPoolSize(5);

// 超时配置
builder.setConnectTimeout(3000);
builder.setOpTimeout(5000);

// 失败模式
builder.setFailureMode(true);

// 健康检查
builder.setHealSessionInterval(5000);

return builder.build();
}

/**
* 动态添加节点
*/
public void addNode(MemcachedClient client, String host, int port)
throws Exception {
InetSocketAddress address = new InetSocketAddress(host, port);
client.addServer(address);
System.out.println(“添加节点: ” + host + “:” + port);
}

/**
* 动态移除节点
*/
public void removeNode(MemcachedClient client, String host, int port)
throws Exception {
InetSocketAddress address = new InetSocketAddress(host, port);
client.removeServer(address);
System.out.println(“移除节点: ” + host + “:” + port);
}

/**
* 获取集群状态
*/
public void printClusterStatus(MemcachedClient client) throws Exception {
Map> stats = client.getStats();

System.out.println(“集群状态:”);
System.out.println(“====================================”);

for (Map.Entry> entry :
stats.entrySet()) {
InetSocketAddress addr = entry.getKey();
Map serverStats = entry.getValue();

System.out.println(“节点: ” + addr.getHostString() + “:” + addr.getPort());
System.out.println(” 版本: ” + serverStats.get(“version”));
System.out.println(” 运行时间: ” + serverStats.get(“uptime”) + “秒”);
System.out.println(” 当前Item数: ” + serverStats.get(“curr_items”));
System.out.println(” 内存使用: ” + formatBytes(
Long.parseLong(serverStats.get(“bytes”))));
System.out.println(” 命中率: ” + calculateHitRate(serverStats) + “%”);
System.out.println();
}
}

private String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B"; if (bytes < 1024 * 1024) return (bytes / 1024) + " KB"; if (bytes < 1024 * 1024 * 1024) return (bytes / 1024 / 1024) + " MB"; return (bytes / 1024 / 1024 / 1024) + " GB"; } private String calculateHitRate(Map stats) {
long hits = Long.parseLong(stats.getOrDefault(“get_hits”, “0”));
long misses = Long.parseLong(stats.getOrDefault(“get_misses”, “0”));
if (hits + misses == 0) return “0.00”;
return String.format(“%.2f”, (double) hits / (hits + misses) * 100);
}
}

3.3.2 故障转移配置

# 故障转移配置

package com.fgedu.cache.failover;

import net.rubyeye.xmemcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;

@Component
public class FailoverManager {

private static final Logger logger = LoggerFactory.getLogger(FailoverManager.class);

private final MemcachedClient client;
private final ScheduledExecutorService scheduler;
private final Set failedNodes =
ConcurrentHashMap.newKeySet();

public FailoverManager(MemcachedClient client) {
this.client = client;
this.scheduler = Executors.newScheduledThreadPool(1);
startHealthCheck();
}

/**
* 启动健康检查
*/
private void startHealthCheck() {
scheduler.scheduleAtFixedRate(() -> {
try {
checkNodes();
} catch (Exception e) {
logger.error(“健康检查失败”, e);
}
}, 0, 10, TimeUnit.SECONDS);
}

/**
* 检查节点健康状态
*/
private void checkNodes() {
Map> stats = client.getStats();

for (InetSocketAddress addr : stats.keySet()) {
boolean isHealthy = checkNodeHealth(addr);

if (!isHealthy && !failedNodes.contains(addr)) {
failedNodes.add(addr);
logger.warn(“节点故障: {}”, addr);
onNodeFailure(addr);
} else if (isHealthy && failedNodes.contains(addr)) {
failedNodes.remove(addr);
logger.info(“节点恢复: {}”, addr);
onNodeRecovery(addr);
}
}
}

/**
* 检查单个节点健康状态
*/
private boolean checkNodeHealth(InetSocketAddress addr) {
try {
Map stats = client.getStats(addr);
return stats != null && !stats.isEmpty();
} catch (Exception e) {
return false;
}
}

/**
* 节点故障处理
*/
private void onNodeFailure(InetSocketAddress addr) {
// 发送告警
sendAlert(“节点故障: ” + addr);

// 可以在这里实现自动摘除逻辑
}

/**
* 节点恢复处理
*/
private void onNodeRecovery(InetSocketAddress addr) {
// 发送通知
sendAlert(“节点恢复: ” + addr);

// 预热节点
warmupNode(addr);
}

/**
* 预热节点
*/
private void warmupNode(InetSocketAddress addr) {
// 加载热点数据到恢复的节点
logger.info(“预热节点: {}”, addr);
}

/**
* 发送告警
*/
private void sendAlert(String message) {
logger.warn(“告警: {}”, message);
// 实现邮件/短信告警
}

/**
* 关闭
*/
public void shutdown() {
scheduler.shutdown();
}
}

Part04-生产案例与实战讲解

4.1 Memcached数据库缓存服务封装案例

以下是一个完整的缓存服务封装案例。更多学习教程公众号风哥教程itpux_com

4.1.1 缓存服务完整实现

# 缓存服务完整实现

package com.fgedu.cache.service;

import net.rubyeye.xmemcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;

@Service
public class DistributedCacheService {

private static final Logger logger =
LoggerFactory.getLogger(DistributedCacheService.class);

@Autowired
private MemcachedClient memcachedClient;

// Key前缀
private static final String KEY_PREFIX = “fgedu_”;

// 默认过期时间
private static final int DEFAULT_EXPIRE = 3600;

/**
* 构建缓存Key
*/
private String buildKey(String namespace, String key) {
return KEY_PREFIX + namespace + “:” + key;
}

/**
* 存储数据
*/
public boolean set(String namespace, String key, T value) {
return set(namespace, key, DEFAULT_EXPIRE, value);
}

public boolean set(String namespace, String key, int expire, T value) {
String cacheKey = buildKey(namespace, key);
try {
memcachedClient.set(cacheKey, expire, value);
logger.debug(“Set cache: {}”, cacheKey);
return true;
} catch (Exception e) {
logger.error(“Set cache error: {}”, cacheKey, e);
return false;
}
}

/**
* 获取数据
*/
public T get(String namespace, String key) {
String cacheKey = buildKey(namespace, key);
try {
T value = memcachedClient.get(cacheKey);
logger.debug(“Get cache: {}, hit: {}”, cacheKey, value != null);
return value;
} catch (Exception e) {
logger.error(“Get cache error: {}”, cacheKey, e);
return null;
}
}

/**
* 获取或加载(缓存穿透保护)
*/
public T getOrLoad(String namespace, String key, int expire,
CacheLoader loader) {
String cacheKey = buildKey(namespace, key);

try {
T value = memcachedClient.get(cacheKey);

if (value != null) {
logger.debug(“Cache hit: {}”, cacheKey);
return value;
}

// 缓存未命中,加载数据
logger.debug(“Cache miss: {}”, cacheKey);
value = loader.load();

if (value != null) {
memcachedClient.set(cacheKey, expire, value);
} else {
// 防止缓存穿透,存储空值
memcachedClient.set(cacheKey, 60, new NullValue());
}

return value;
} catch (Exception e) {
logger.error(“Get or load error: {}”, cacheKey, e);
return loader.load();
}
}

/**
* 删除数据
*/
public boolean delete(String namespace, String key) {
String cacheKey = buildKey(namespace, key);
try {
boolean result = memcachedClient.delete(cacheKey);
logger.debug(“Delete cache: {}, result: {}”, cacheKey, result);
return result;
} catch (Exception e) {
logger.error(“Delete cache error: {}”, cacheKey, e);
return false;
}
}

/**
* 批量获取
*/
public Map multiGet(String namespace,
Collection keys) {
List cacheKeys = new ArrayList<>();
for (String key : keys) {
cacheKeys.add(buildKey(namespace, key));
}

try {
Map result = memcachedClient.get(cacheKeys);

// 转换Key
Map finalResult = new HashMap<>();
for (Map.Entry entry : result.entrySet()) {
String originalKey = entry.getKey()
.substring((KEY_PREFIX + namespace + “:”).length());
finalResult.put(originalKey, entry.getValue());
}

return finalResult;
} catch (Exception e) {
logger.error(“Multi get error”, e);
return Collections.emptyMap();
}
}

/**
* 计数器
*/
public long increment(String namespace, String key, long delta) {
String cacheKey = buildKey(namespace, key);
try {
return memcachedClient.incr(cacheKey, delta, 0);
} catch (Exception e) {
logger.error(“Increment error: {}”, cacheKey, e);
return -1;
}
}

/**
* 缓存加载器接口
*/
@FunctionalInterface
public interface CacheLoader {
T load();
}

/**
* 空值标记
*/
private static class NullValue implements Serializable {
private static final long serialVersionUID = 1L;
}
}

4.1.2 使用示例

# 使用示例

package com.fgedu.service;

import com.fgedu.cache.service.DistributedCacheService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class UserService {

@Autowired
private DistributedCacheService cacheService;

@Autowired
private UserRepository userRepository;

/**
* 获取用户(带缓存)
*/
public User getUser(Long userId) {
return cacheService.getOrLoad(“user”, userId.toString(), 3600,
() -> userRepository.findById(userId));
}

/**
* 更新用户(更新缓存)
*/
public void updateUser(User user) {
userRepository.save(user);
cacheService.set(“user”, user.getId().toString(), 3600, user);
}

/**
* 删除用户(清除缓存)
*/
public void deleteUser(Long userId) {
userRepository.deleteById(userId);
cacheService.delete(“user”, userId.toString());
}

/**
* 批量获取用户
*/
public Map multiGetUsers(List userIds) {
List keys = new ArrayList<>();
for (Long id : userIds) {
keys.add(id.toString());
}
return cacheService.multiGet(“user”, keys);
}

/**
* 用户访问计数
*/
public long incrementVisitCount(Long userId) {
return cacheService.increment(“visit”, userId.toString(), 1);
}
}

4.2 Memcached数据库Session缓存案例

4.2.1 Session存储实现

# Session存储实现

package com.fgedu.session;

import net.rubyeye.xmemcached.MemcachedClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.session.Session;
import org.springframework.session.SessionRepository;
import org.springframework.stereotype.Repository;
import java.time.Duration;
import java.time.Instant;
import java.util.*;

@Repository
public class MemcachedSessionRepository implements SessionRepository {

@Autowired
private MemcachedClient memcachedClient;

private static final String SESSION_PREFIX = “fgedu_session:”;
private static final int DEFAULT_MAX_INACTIVE_INTERVAL = 1800; // 30分钟

@Override
public Session createSession() {
MemcachedSession session = new MemcachedSession();
session.setMaxInactiveInterval(Duration.ofSeconds(DEFAULT_MAX_INACTIVE_INTERVAL));
return session;
}

@Override
public void save(Session session) {
String key = SESSION_PREFIX + session.getId();
int expire = (int) session.getMaxInactiveInterval().getSeconds();

try {
Map sessionData = new HashMap<>();
sessionData.put(“id”, session.getId());
sessionData.put(“creationTime”, session.getCreationTime().toEpochMilli());
sessionData.put(“lastAccessedTime”, session.getLastAccessedTime().toEpochMilli());
sessionData.put(“maxInactiveInterval”, session.getMaxInactiveInterval().getSeconds());
sessionData.put(“attributes”, new HashMap<>(session.getAttributeNames()));

memcachedClient.set(key, expire, sessionData);
} catch (Exception e) {
throw new RuntimeException(“保存Session失败”, e);
}
}

@Override
public Session findById(String id) {
String key = SESSION_PREFIX + id;

try {
Map sessionData = memcachedClient.get(key);

if (sessionData == null) {
return null;
}

MemcachedSession session = new MemcachedSession(id);
session.setCreationTime(Instant.ofEpochMilli(
(Long) sessionData.get(“creationTime”)));
session.setLastAccessedTime(Instant.ofEpochMilli(
(Long) sessionData.get(“lastAccessedTime”)));
session.setMaxInactiveInterval(Duration.ofSeconds(
(Long) sessionData.get(“maxInactiveInterval”)));

@SuppressWarnings(“unchecked”)
Map attributes =
(Map) sessionData.get(“attributes”);
for (Map.Entry entry : attributes.entrySet()) {
session.setAttribute(entry.getKey(), entry.getValue());
}

return session;
} catch (Exception e) {
throw new RuntimeException(“获取Session失败”, e);
}
}

@Override
public void deleteById(String id) {
String key = SESSION_PREFIX + id;

try {
memcachedClient.delete(key);
} catch (Exception e) {
throw new RuntimeException(“删除Session失败”, e);
}
}
}

# MemcachedSession类
public class MemcachedSession implements Session {
private String id;
private Instant creationTime;
private Instant lastAccessedTime;
private Duration maxInactiveInterval;
private Map attributes = new HashMap<>();

public MemcachedSession() {
this.id = UUID.randomUUID().toString();
this.creationTime = Instant.now();
this.lastAccessedTime = this.creationTime;
}

public MemcachedSession(String id) {
this.id = id;
}

// 实现Session接口方法…
}

4.3 Memcached数据库数据缓存案例

4.3.1 商品缓存案例

# 商品缓存案例

package com.fgedu.product.service;

import com.fgedu.cache.service.DistributedCacheService;
import com.fgedu.product.entity.Product;
import com.fgedu.product.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;

@Service
public class ProductService {

@Autowired
private DistributedCacheService cacheService;

@Autowired
private ProductRepository productRepository;

// 缓存过期时间:5分钟
private static final int CACHE_EXPIRE = 300;

// 热点商品缓存时间:30分钟
private static final int HOT_CACHE_EXPIRE = 1800;

// 热点商品ID集合
private Set hotProductIds = ConcurrentHashMap.newKeySet();

/**
* 获取商品信息
*/
public Product getProduct(Long productId) {
int expire = hotProductIds.contains(productId) ?
HOT_CACHE_EXPIRE : CACHE_EXPIRE;

return cacheService.getOrLoad(“product”, productId.toString(), expire,
() -> productRepository.findById(productId));
}

/**
* 更新商品信息
*/
public void updateProduct(Product product) {
productRepository.save(product);

// 更新缓存
int expire = hotProductIds.contains(product.getId()) ?
HOT_CACHE_EXPIRE : CACHE_EXPIRE;
cacheService.set(“product”, product.getId().toString(), expire, product);
}

/**
* 删除商品
*/
public void deleteProduct(Long productId) {
productRepository.deleteById(productId);

// 清除缓存
cacheService.delete(“product”, productId.toString());
hotProductIds.remove(productId);
}

/**
* 批量获取商品
*/
public Map multiGetProducts(List productIds) {
List keys = new ArrayList<>();
for (Long id : productIds) {
keys.add(id.toString());
}

Map cachedProducts = cacheService.multiGet(“product”, keys);

// 查找未命中的商品
List missedIds = new ArrayList<>();
for (Long id : productIds) {
if (!cachedProducts.containsKey(id.toString())) {
missedIds.add(id);
}
}

// 加载未命中的商品
if (!missedIds.isEmpty()) {
List products = productRepository.findAllById(missedIds);
for (Product product : products) {
cachedProducts.put(product.getId().toString(), product);

// 写入缓存
int expire = hotProductIds.contains(product.getId()) ?
HOT_CACHE_EXPIRE : CACHE_EXPIRE;
cacheService.set(“product”, product.getId().toS

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息