Compare commits
2 Commits
808d285888
...
e6e1bee8df
Author | SHA1 | Date | |
---|---|---|---|
e6e1bee8df | |||
d3c81412b9 |
@ -1,12 +1,16 @@
|
|||||||
package com.bipt.intelligentapplicationorchestrationservice.controller;
|
package com.bipt.intelligentapplicationorchestrationservice.controller;
|
||||||
|
|
||||||
import com.bipt.intelligentapplicationorchestrationservice.config.IpConfig;
|
import com.bipt.intelligentapplicationorchestrationservice.config.IpConfig;
|
||||||
|
import com.bipt.intelligentapplicationorchestrationservice.entity.DeployRequest;
|
||||||
|
import com.bipt.intelligentapplicationorchestrationservice.mapper.ModelMapper;
|
||||||
import com.bipt.intelligentapplicationorchestrationservice.pojo.*;
|
import com.bipt.intelligentapplicationorchestrationservice.pojo.*;
|
||||||
|
import com.bipt.intelligentapplicationorchestrationservice.service.ModelDeployer;
|
||||||
import com.bipt.intelligentapplicationorchestrationservice.service.PublishService;
|
import com.bipt.intelligentapplicationorchestrationservice.service.PublishService;
|
||||||
import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil;
|
import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil;
|
||||||
import io.swagger.v3.oas.annotations.Operation;
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@ -27,17 +31,29 @@ public class PublishController {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private NacosServiceUtil nacosServiceUtil;
|
private NacosServiceUtil nacosServiceUtil;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ModelMapper modelMapper;
|
||||||
@Autowired
|
@Autowired
|
||||||
private IpConfig ipConfig;
|
private IpConfig ipConfig;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ModelDeployer modelDeployer;
|
||||||
|
|
||||||
@PostMapping
|
@PostMapping
|
||||||
@Operation(summary ="新增发布请求")
|
@Operation(summary ="新增发布请求")
|
||||||
@Transactional
|
@Transactional
|
||||||
public OptResult<List<ServicePublishVO>> save(@RequestBody ServicePublishDTO servicePublishDTO) {
|
public OptResult<List<ServicePublishVO>> save(@RequestBody ServicePublishDTO servicePublishDTO) {
|
||||||
log.info("模型发布请求:{}", servicePublishDTO);
|
log.info("模型发布请求:{}", servicePublishDTO);
|
||||||
publishService.save(servicePublishDTO);
|
publishService.save(servicePublishDTO);
|
||||||
//todo 调用模型部署
|
//调用模型部署
|
||||||
|
DeployRequest request = new DeployRequest();
|
||||||
|
Long modelId = servicePublishDTO.getModelId();
|
||||||
|
ModelVersion modelVersion = modelMapper.selectById(modelId);
|
||||||
|
String modelConfig = modelVersion.getModelConfig();
|
||||||
|
//假设modelConfig只存GPU数据
|
||||||
|
request.setModelId(String.valueOf(modelId));
|
||||||
|
request.setRequiredMemory(Integer.parseInt(modelConfig));
|
||||||
|
modelDeployer.deploy(request);
|
||||||
// 获取前端传来的IP字符串
|
// 获取前端传来的IP字符串
|
||||||
String ipListStr = servicePublishDTO.getIp();
|
String ipListStr = servicePublishDTO.getIp();
|
||||||
if (ipListStr == null || ipListStr.trim().isEmpty()) {
|
if (ipListStr == null || ipListStr.trim().isEmpty()) {
|
||||||
|
@ -3,8 +3,10 @@ package com.bipt.intelligentapplicationorchestrationservice.service;
|
|||||||
import com.bipt.intelligentapplicationorchestrationservice.mapper.GpuResourceDao;
|
import com.bipt.intelligentapplicationorchestrationservice.mapper.GpuResourceDao;
|
||||||
import com.bipt.intelligentapplicationorchestrationservice.exception.CacheInitException;
|
import com.bipt.intelligentapplicationorchestrationservice.exception.CacheInitException;
|
||||||
import com.bipt.intelligentapplicationorchestrationservice.entity.GpuResource;
|
import com.bipt.intelligentapplicationorchestrationservice.entity.GpuResource;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.data.redis.RedisConnectionFailureException;
|
import org.springframework.data.redis.RedisConnectionFailureException;
|
||||||
@ -18,7 +20,6 @@ import java.util.List;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@Transactional // 添加类级别事务管理
|
|
||||||
@Component
|
@Component
|
||||||
public class CacheManager {
|
public class CacheManager {
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -27,6 +28,9 @@ public class CacheManager {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private GpuResourceDao gpuResourceDao;
|
private GpuResourceDao gpuResourceDao;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ObjectMapper objectMapper; // 注入ObjectMapper用于类型转换
|
||||||
|
|
||||||
private final ReentrantLock lock = new ReentrantLock();
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
@Value("${cache.redis-key-prefix:gpu:}")
|
@Value("${cache.redis-key-prefix:gpu:}")
|
||||||
@ -38,9 +42,9 @@ public class CacheManager {
|
|||||||
@Value("${cache.init-batch-size:500}")
|
@Value("${cache.init-batch-size:500}")
|
||||||
private int initBatchSize;
|
private int initBatchSize;
|
||||||
|
|
||||||
private static final Logger log = org.slf4j.LoggerFactory.getLogger(CacheManager.class);
|
private static final Logger log = LoggerFactory.getLogger(CacheManager.class);
|
||||||
|
|
||||||
// 全量加载(带分页和分布式锁)
|
// 全量加载(带分页和分布式锁)
|
||||||
@Transactional(propagation = Propagation.REQUIRED) // 方法级别覆盖
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void loadFullCache() {
|
public void loadFullCache() {
|
||||||
if (tryLock()) {
|
if (tryLock()) {
|
||||||
@ -82,16 +86,12 @@ public class CacheManager {
|
|||||||
// 带随机TTL的缓存设置
|
// 带随机TTL的缓存设置
|
||||||
private void setCacheWithTTL(GpuResource entity) {
|
private void setCacheWithTTL(GpuResource entity) {
|
||||||
String key = buildKey(entity.getGPUId().toString());
|
String key = buildKey(entity.getGPUId().toString());
|
||||||
GpuResource cached = (GpuResource) redisTemplate.opsForValue().get(key);
|
|
||||||
|
|
||||||
// 保留原有内存字段值
|
// 直接存储实体对象,确保类型一致性
|
||||||
if (cached != null && cached.getGPUMemorySize() != null) {
|
|
||||||
entity.setGPUMemorySize(cached.getGPUMemorySize());
|
|
||||||
}
|
|
||||||
redisTemplate.opsForValue().set(
|
redisTemplate.opsForValue().set(
|
||||||
key,
|
key,
|
||||||
entity,
|
entity,
|
||||||
ttlBase + (int)(Math.random() * 600), // 随机TTL防止雪崩
|
ttlBase + (int)(Math.random() * 600),
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -114,6 +114,7 @@ public class CacheManager {
|
|||||||
private void unlock() {
|
private void unlock() {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 分页加载入口
|
// 分页加载入口
|
||||||
public void loadFullCache(int batchSize) {
|
public void loadFullCache(int batchSize) {
|
||||||
int page = 0;
|
int page = 0;
|
||||||
@ -121,12 +122,11 @@ public class CacheManager {
|
|||||||
List<GpuResource> batch = gpuResourceDao.findByPage(page * batchSize, batchSize);
|
List<GpuResource> batch = gpuResourceDao.findByPage(page * batchSize, batchSize);
|
||||||
if (batch.isEmpty()) break;
|
if (batch.isEmpty()) break;
|
||||||
|
|
||||||
batch.forEach(this::refreshWithRetry); // 带重试的刷新逻辑
|
batch.forEach(this::refreshWithRetry);
|
||||||
page++;
|
page++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// 带重试机制的缓存刷新
|
// 带重试机制的缓存刷新
|
||||||
public void refreshWithRetry(GpuResource entity) {
|
public void refreshWithRetry(GpuResource entity) {
|
||||||
try {
|
try {
|
||||||
@ -135,7 +135,7 @@ public class CacheManager {
|
|||||||
// 3次重试逻辑
|
// 3次重试逻辑
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
try {
|
try {
|
||||||
log.info("重试第 {} 次", i + 1); // 添加日志
|
log.info("重试第 {} 次", i + 1);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
setCacheWithTTL(entity);
|
setCacheWithTTL(entity);
|
||||||
return;
|
return;
|
||||||
@ -148,7 +148,6 @@ public class CacheManager {
|
|||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,8 +161,35 @@ public class CacheManager {
|
|||||||
redisTemplate.delete(key);
|
redisTemplate.delete(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 修改获取缓存的方法,增加类型安全处理
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public GpuResource getFromCache(String gpuId) {
|
public GpuResource getFromCache(String gpuId) {
|
||||||
return (GpuResource) redisTemplate.opsForValue().get("gpu:" + gpuId);
|
String key = buildKey(gpuId);
|
||||||
}
|
Object value = redisTemplate.opsForValue().get(key);
|
||||||
|
|
||||||
|
// 处理可能的类型不匹配问题
|
||||||
|
if (value == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 优先尝试直接转换
|
||||||
|
if (value instanceof GpuResource) {
|
||||||
|
return (GpuResource) value;
|
||||||
|
}
|
||||||
|
// 如果是LinkedHashMap,使用ObjectMapper转换
|
||||||
|
else if (value instanceof java.util.LinkedHashMap) {
|
||||||
|
return objectMapper.convertValue(value, GpuResource.class);
|
||||||
|
}
|
||||||
|
// 其他情况尝试序列化后反序列化(适用于JSON存储场景)
|
||||||
|
else {
|
||||||
|
// 先序列化为JSON字符串,再反序列化为对象
|
||||||
|
String json = objectMapper.writeValueAsString(value);
|
||||||
|
return objectMapper.readValue(json, GpuResource.class);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("获取缓存时类型转换失败,key: {}, valueType: {}", key, value.getClass().getName(), e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Reference in New Issue
Block a user