xiaohucoding #13

Manually merged
lpz merged 5 commits from xiaohucoding into main 2025-05-29 09:36:32 +08:00
16 changed files with 437 additions and 176 deletions
Showing only changes of commit 77fb43e95d - Show all commits

View File

@ -51,10 +51,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
@ -100,6 +100,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- 测试依赖 -->
<dependency>

View File

@ -11,17 +11,13 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tag(name ="服务发布相关接口")
@RestController
@RequestMapping("/publish")
@Slf4j
public class publishController {
public class PublishController {
@Autowired
private PublishService publishService;
@ -38,19 +34,35 @@ public class publishController {
publishService.save(servicePublishDTO);
//todo 调用模型部署
// 获取前端传来的IP字符串
String ipListStr = servicePublishDTO.getIp();
if (ipListStr == null || ipListStr.trim().isEmpty()) {
log.warn("IP列表为空不进行Nacos注册");
return OptResult.success();
}
try {
// 新增传递apiUrl参数
nacosServiceUtil.registerService(
servicePublishDTO.getModelId().toString(),
servicePublishDTO.getIp(),
8080,
servicePublishDTO.getApiUrl()
);
log.info("Nacos服务注册成功");
// 使用逗号分割IP字符串
String[] ipArray = ipListStr.split(",");
// 循环注册每个IP到Nacos
for (String ip : ipArray) {
String trimmedIp = ip.trim();
if (!trimmedIp.isEmpty()) {
nacosServiceUtil.registerService(
servicePublishDTO.getModelId().toString(),
trimmedIp,
8080,
servicePublishDTO.getApiUrl()
);
log.info("Nacos服务注册成功: {}", trimmedIp);
}
}
} catch (Exception e) {
log.error("Nacos服务注册失败", e);
return OptResult.error("Nacos服务注册失败"); // 根据业务需求返回错误
}
return OptResult.success();
}

View File

@ -0,0 +1,178 @@
package com.bipt.intelligentapplicationorchestrationservice.controller;
import com.bipt.intelligentapplicationorchestrationservice.pojo.OptResult;
import com.bipt.intelligentapplicationorchestrationservice.service.ServiceAPIService;
import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.models.security.SecurityScheme;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Tag(name ="服务API相关接口")
@RestController
@RequestMapping("/API")
@Slf4j
public class ServiceAPIController {
@Autowired
private ServiceAPIService serviceAPIService;
@Autowired
private NacosServiceUtil nacosServiceUtil;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostMapping("/release")
@Operation(summary = "结束访问")
@Transactional
public OptResult releaseResource(@PathVariable Long modelId) {
String key = "modelId:" + modelId;
String modelConfig = (String) redisTemplate.opsForValue().get(key);
int userMemorySize = parseGpuMemorySize(modelConfig);
List<String> instanceIps;
try {
instanceIps = nacosServiceUtil.getServiceInstances(modelId.toString());
} catch (Exception e) {
log.error("获取Nacos实例失败", e);
return OptResult.error("获取实例失败");
}
int memorySize;
for (String ip : instanceIps) {
String ipKey = "ip:" + ip;
Integer nowMemorySizeOBJ = (Integer) redisTemplate.opsForValue().get(ipKey);
int nowMemorySize = nowMemorySizeOBJ;
memorySize = nowMemorySize + userMemorySize;
// 更新IP对应的资源值
redisTemplate.opsForValue().set(ipKey, memorySize);
// 设置缓存过期时间3600秒
redisTemplate.expire(ipKey, 3600, TimeUnit.SECONDS);
}
// 处理等待队列(先来先服务)
String waitQueueKey = "waitQueue:" + modelId;
// 取出队列头部的任务(最早加入的)
Long waitModelId = (Long) redisTemplate.opsForList().leftPop(waitQueueKey);
if (waitModelId != null) {
log.info("检测到等待队列任务尝试调度模型ID: {}", waitModelId);
return schedule(waitModelId); // 重新调度最早的任务
} else {
log.info("等待队列为空,无任务需要处理");
}
return OptResult.success("资源释放成功");
}
@PostMapping("/request")
@Operation(summary = "请求调度")
@Transactional
public OptResult schedule(@PathVariable Long modelId) {
// 1. 存储modelConfig到缓存
String modelConfig = serviceAPIService.getByModelId(modelId);
int requestMemorySize = parseGpuMemorySize(modelConfig);
String modelConfigKey = "modelConfig:" + modelId;
redisTemplate.opsForValue().set(modelConfigKey, modelConfig);
// 2. 获取Nacos实例IP列表
List<String> instanceIps;
try {
instanceIps = nacosServiceUtil.getServiceInstances(modelId.toString());
} catch (Exception e) {
log.error("获取Nacos实例失败", e);
return OptResult.error("获取实例失败");
}
Set<String> gpuKeys = redisTemplate.keys("gpu:*");
//根据IP列表查找资源
for (String instanceIp : instanceIps) {
for (String gpuKey : gpuKeys) {
String GPUConfig = (String) redisTemplate.opsForValue().get(gpuKey);
if (GPUConfig != null) {
// 分割键值对
String[] pairs = GPUConfig.split(",");
String ip = null;
int memorySize = 0;
for (String pair : pairs) {
String[] keyValue = pair.split(":", 2);
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();
if ("IP".equalsIgnoreCase(key)) {
ip = value;
} else if ("GPUMemorySize".equalsIgnoreCase(key)) {
memorySize = Integer.parseInt(value);
}
}
}
// 检查解析出的 IP 是否在 Nacos 实例列表中
if (instanceIp.equals(ip)) {
log.info("找到 IP {} 对应的 GPU 内存: {} ", ip, memorySize);
if (memorySize>=requestMemorySize){
int newMemorySize = memorySize - requestMemorySize;
String ipKey = "ip:" + ip;
redisTemplate.opsForValue().set(ipKey,newMemorySize);
//访问请求最大时间为3600s
redisTemplate.expire(ipKey, 3600, TimeUnit.SECONDS);
}
return OptResult.success("资源分配成功使用ip:" + ip);
}else {
log.info("资源不足");
}
}
}
}
// 所有实例检查完毕未找到足够资源
String waitQueueKey = "waitQueue:" + modelId;
// 改为右插入保证队列顺序为FIFO最早的任务在列表头部
redisTemplate.opsForList().rightPush(waitQueueKey, modelId);
log.info("未找到足够资源,任务 {} 加入等待队列", modelId);
return OptResult.error("资源不足,等待中");
}
/**
* 从模型配置字符串中解析GPU内存需求
* @param modelConfig 模型配置字符串,格式如 "GPUMemorySize:8000,version:1"
* @return 解析到的GPU内存大小MB若解析失败返回-1
*/
private int parseGpuMemorySize(String modelConfig) {
if (modelConfig == null || modelConfig.isEmpty()) {
log.error("模型配置为空无法解析GPU内存需求");
return -1;
}
int requestMemorySize = 0;
String[] config = modelConfig.split(",");
for (String pair : config) {
// 按冒号分割键值对
String[] keyValue = pair.split(":", 2);
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();
// 匹配 GPUMemorySize 字段(忽略大小写)
if ("GPUMemorySize".equalsIgnoreCase(key)) {
try {
requestMemorySize = Integer.parseInt(value);
log.info("模型GPU内存: {} MB", requestMemorySize);
break; // 找到后即可退出循环
} catch (NumberFormatException e) {
log.error("解析GPUMemorySize失败值: {}", value, e);
return -1;
}
}
}
}
if (requestMemorySize <= 0) {
log.error("模型需求GPU内存未配置或无效");
return -1;
}
return requestMemorySize;
}
}

View File

@ -14,7 +14,6 @@ public interface PublishMapper {
void insert(ServicePublishDTO servicePublishDTO);
Long getByApiUrl(String apiUrl);
@Select("select model_config from model_version where model_id=#{modelId} and status = 1")
String getById(Long modelId);
}

View File

@ -0,0 +1,11 @@
package com.bipt.intelligentapplicationorchestrationservice.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
@Mapper
public interface ServiceAPIMapper {
@Select("select model_config from model_version where model_id=#{modelId} and status = 1")
String getById(Long modelId);
}

View File

@ -29,18 +29,12 @@ public class PublishServiceImpl implements PublishService {
if (id != null){
throw new IllegalArgumentException("请求已存在: " + apiUrl);
}
//todo调用服务部署
publishMapper.insert(servicePublishDTO);
}
/**
* 根据id查找配置信息
* @param modelId
* @return
*/
@Override
public String getByModelId(Long modelId) {
return publishMapper.getById(modelId);
}
}

View File

@ -0,0 +1,23 @@
package com.bipt.intelligentapplicationorchestrationservice.service.Impl;
import com.bipt.intelligentapplicationorchestrationservice.mapper.ServiceAPIMapper;
import com.bipt.intelligentapplicationorchestrationservice.service.ServiceAPIService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ServiceAPIImpl implements ServiceAPIService {
@Autowired
private ServiceAPIMapper serviceAPIMapper;
/**
* 根据id查找配置信息
* @param modelId
* @return
*/
@Override
public String getByModelId(Long modelId) {
return serviceAPIMapper.getById(modelId);
};
}

View File

@ -9,5 +9,4 @@ public interface PublishService {
String getByModelId(Long modelId);
}

View File

@ -0,0 +1,5 @@
package com.bipt.intelligentapplicationorchestrationservice.service;
public interface ServiceAPIService {
String getByModelId(Long modelId);
}

View File

@ -18,4 +18,10 @@ spring.data.redis.host=116.205.121.200
spring.data.redis.port=6379
spring.data.redis.password=Jbjhhzstsl97@
spring.data.redis.database=0
spring.data.redis.timeout=3000
spring.data.redis.timeout=3000
# 服务路由配置
spring.cloud.gateway.routes[0].id=request-service-route
spring.cloud.gateway.routes[0].uri=lb://intelligent-application-orchestration-service
spring.cloud.gateway.routes[0].predicates[0]=Path=/request