Compare commits

...

2 Commits

Author SHA1 Message Date
e6e1bee8df Cache的类型转换的更改优化 2025-07-01 12:07:00 +08:00
d3c81412b9 Cache的类型转换的更改优化 2025-07-01 09:27:28 +08:00
2 changed files with 60 additions and 18 deletions

View File

@ -1,12 +1,16 @@
package com.bipt.intelligentapplicationorchestrationservice.controller; package com.bipt.intelligentapplicationorchestrationservice.controller;
import com.bipt.intelligentapplicationorchestrationservice.config.IpConfig; import com.bipt.intelligentapplicationorchestrationservice.config.IpConfig;
import com.bipt.intelligentapplicationorchestrationservice.entity.DeployRequest;
import com.bipt.intelligentapplicationorchestrationservice.mapper.ModelMapper;
import com.bipt.intelligentapplicationorchestrationservice.pojo.*; import com.bipt.intelligentapplicationorchestrationservice.pojo.*;
import com.bipt.intelligentapplicationorchestrationservice.service.ModelDeployer;
import com.bipt.intelligentapplicationorchestrationservice.service.PublishService; import com.bipt.intelligentapplicationorchestrationservice.service.PublishService;
import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil; import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -27,17 +31,29 @@ public class PublishController {
@Autowired @Autowired
private NacosServiceUtil nacosServiceUtil; private NacosServiceUtil nacosServiceUtil;
@Autowired
private ModelMapper modelMapper;
@Autowired @Autowired
private IpConfig ipConfig; private IpConfig ipConfig;
@Autowired
private ModelDeployer modelDeployer;
@PostMapping @PostMapping
@Operation(summary ="新增发布请求") @Operation(summary ="新增发布请求")
@Transactional @Transactional
public OptResult<List<ServicePublishVO>> save(@RequestBody ServicePublishDTO servicePublishDTO) { public OptResult<List<ServicePublishVO>> save(@RequestBody ServicePublishDTO servicePublishDTO) {
log.info("模型发布请求:{}", servicePublishDTO); log.info("模型发布请求:{}", servicePublishDTO);
publishService.save(servicePublishDTO); publishService.save(servicePublishDTO);
//todo 调用模型部署 //调用模型部署
DeployRequest request = new DeployRequest();
Long modelId = servicePublishDTO.getModelId();
ModelVersion modelVersion = modelMapper.selectById(modelId);
String modelConfig = modelVersion.getModelConfig();
//假设modelConfig只存GPU数据
request.setModelId(String.valueOf(modelId));
request.setRequiredMemory(Integer.parseInt(modelConfig));
modelDeployer.deploy(request);
// 获取前端传来的IP字符串 // 获取前端传来的IP字符串
String ipListStr = servicePublishDTO.getIp(); String ipListStr = servicePublishDTO.getIp();
if (ipListStr == null || ipListStr.trim().isEmpty()) { if (ipListStr == null || ipListStr.trim().isEmpty()) {

View File

@ -3,8 +3,10 @@ package com.bipt.intelligentapplicationorchestrationservice.service;
import com.bipt.intelligentapplicationorchestrationservice.mapper.GpuResourceDao; import com.bipt.intelligentapplicationorchestrationservice.mapper.GpuResourceDao;
import com.bipt.intelligentapplicationorchestrationservice.exception.CacheInitException; import com.bipt.intelligentapplicationorchestrationservice.exception.CacheInitException;
import com.bipt.intelligentapplicationorchestrationservice.entity.GpuResource; import com.bipt.intelligentapplicationorchestrationservice.entity.GpuResource;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.RedisConnectionFailureException; import org.springframework.data.redis.RedisConnectionFailureException;
@ -18,7 +20,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@Transactional // 添加类级别事务管理
@Component @Component
public class CacheManager { public class CacheManager {
@Autowired @Autowired
@ -27,6 +28,9 @@ public class CacheManager {
@Autowired @Autowired
private GpuResourceDao gpuResourceDao; private GpuResourceDao gpuResourceDao;
@Autowired
private ObjectMapper objectMapper; // 注入ObjectMapper用于类型转换
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
@Value("${cache.redis-key-prefix:gpu:}") @Value("${cache.redis-key-prefix:gpu:}")
@ -38,9 +42,9 @@ public class CacheManager {
@Value("${cache.init-batch-size:500}") @Value("${cache.init-batch-size:500}")
private int initBatchSize; private int initBatchSize;
private static final Logger log = org.slf4j.LoggerFactory.getLogger(CacheManager.class); private static final Logger log = LoggerFactory.getLogger(CacheManager.class);
// 全量加载(带分页和分布式锁) // 全量加载(带分页和分布式锁)
@Transactional(propagation = Propagation.REQUIRED) // 方法级别覆盖
@PostConstruct @PostConstruct
public void loadFullCache() { public void loadFullCache() {
if (tryLock()) { if (tryLock()) {
@ -82,16 +86,12 @@ public class CacheManager {
// 带随机TTL的缓存设置 // 带随机TTL的缓存设置
private void setCacheWithTTL(GpuResource entity) { private void setCacheWithTTL(GpuResource entity) {
String key = buildKey(entity.getGPUId().toString()); String key = buildKey(entity.getGPUId().toString());
GpuResource cached = (GpuResource) redisTemplate.opsForValue().get(key);
// 保留原有内存字段值 // 直接存储实体对象,确保类型一致性
if (cached != null && cached.getGPUMemorySize() != null) {
entity.setGPUMemorySize(cached.getGPUMemorySize());
}
redisTemplate.opsForValue().set( redisTemplate.opsForValue().set(
key, key,
entity, entity,
ttlBase + (int)(Math.random() * 600), // 随机TTL防止雪崩 ttlBase + (int)(Math.random() * 600),
TimeUnit.SECONDS TimeUnit.SECONDS
); );
} }
@ -114,6 +114,7 @@ public class CacheManager {
private void unlock() { private void unlock() {
lock.unlock(); lock.unlock();
} }
// 分页加载入口 // 分页加载入口
public void loadFullCache(int batchSize) { public void loadFullCache(int batchSize) {
int page = 0; int page = 0;
@ -121,12 +122,11 @@ public class CacheManager {
List<GpuResource> batch = gpuResourceDao.findByPage(page * batchSize, batchSize); List<GpuResource> batch = gpuResourceDao.findByPage(page * batchSize, batchSize);
if (batch.isEmpty()) break; if (batch.isEmpty()) break;
batch.forEach(this::refreshWithRetry); // 带重试的刷新逻辑 batch.forEach(this::refreshWithRetry);
page++; page++;
} }
} }
// 带重试机制的缓存刷新 // 带重试机制的缓存刷新
public void refreshWithRetry(GpuResource entity) { public void refreshWithRetry(GpuResource entity) {
try { try {
@ -135,7 +135,7 @@ public class CacheManager {
// 3次重试逻辑 // 3次重试逻辑
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
try { try {
log.info("重试第 {} 次", i + 1); // 添加日志 log.info("重试第 {} 次", i + 1);
Thread.sleep(1000); Thread.sleep(1000);
setCacheWithTTL(entity); setCacheWithTTL(entity);
return; return;
@ -148,7 +148,6 @@ public class CacheManager {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
} }
} }
@ -162,8 +161,35 @@ public class CacheManager {
redisTemplate.delete(key); redisTemplate.delete(key);
} }
// 修改获取缓存的方法,增加类型安全处理
@SuppressWarnings("unchecked")
public GpuResource getFromCache(String gpuId) { public GpuResource getFromCache(String gpuId) {
return (GpuResource) redisTemplate.opsForValue().get("gpu:" + gpuId); String key = buildKey(gpuId);
Object value = redisTemplate.opsForValue().get(key);
// 处理可能的类型不匹配问题
if (value == null) {
return null;
} }
try {
// 优先尝试直接转换
if (value instanceof GpuResource) {
return (GpuResource) value;
}
// 如果是LinkedHashMap使用ObjectMapper转换
else if (value instanceof java.util.LinkedHashMap) {
return objectMapper.convertValue(value, GpuResource.class);
}
// 其他情况尝试序列化后反序列化适用于JSON存储场景
else {
// 先序列化为JSON字符串再反序列化为对象
String json = objectMapper.writeValueAsString(value);
return objectMapper.readValue(json, GpuResource.class);
}
} catch (Exception e) {
log.error("获取缓存时类型转换失败key: {}, valueType: {}", key, value.getClass().getName(), e);
return null;
}
}
} }