Merge branch 'xiaohucoding'
# Conflicts: # doc/WorkReport/2025-05-hky.md # src/main/resources/application.properties
This commit is contained in:
@ -0,0 +1,75 @@
|
||||
package com.bipt.intelligentapplicationorchestrationservice.controller;
|
||||
|
||||
import com.bipt.intelligentapplicationorchestrationservice.pojo.*;
|
||||
import com.bipt.intelligentapplicationorchestrationservice.service.PublishService;
|
||||
import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Tag(name ="服务发布相关接口")
|
||||
@RestController
|
||||
@RequestMapping("/publish")
|
||||
@Slf4j
|
||||
public class PublishController {
|
||||
@Autowired
|
||||
private PublishService publishService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate redisTemplate;
|
||||
@Autowired
|
||||
private NacosServiceUtil nacosServiceUtil;
|
||||
|
||||
@PostMapping
|
||||
@Operation(summary ="新增发布请求")
|
||||
@Transactional
|
||||
public OptResult<List<ServicePublishVO>> save(@RequestBody ServicePublishDTO servicePublishDTO) {
|
||||
log.info("模型发布请求:{}", servicePublishDTO);
|
||||
publishService.save(servicePublishDTO);
|
||||
//todo 调用模型部署
|
||||
|
||||
// 获取前端传来的IP字符串
|
||||
String ipListStr = servicePublishDTO.getIp();
|
||||
if (ipListStr == null || ipListStr.trim().isEmpty()) {
|
||||
log.warn("IP列表为空,不进行Nacos注册");
|
||||
return OptResult.success();
|
||||
}
|
||||
|
||||
try {
|
||||
// 使用逗号分割IP字符串
|
||||
String[] ipArray = ipListStr.split(",");
|
||||
// 循环注册每个IP到Nacos
|
||||
for (String ip : ipArray) {
|
||||
String trimmedIp = ip.trim();
|
||||
if (!trimmedIp.isEmpty()) {
|
||||
nacosServiceUtil.registerService(
|
||||
servicePublishDTO.getModelId().toString(),
|
||||
trimmedIp,
|
||||
8080,
|
||||
servicePublishDTO.getApiUrl()
|
||||
);
|
||||
log.info("Nacos服务注册成功: {}", trimmedIp);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Nacos服务注册失败", e);
|
||||
return OptResult.error("Nacos服务注册失败"); // 根据业务需求返回错误
|
||||
}
|
||||
|
||||
|
||||
return OptResult.success();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,178 @@
|
||||
package com.bipt.intelligentapplicationorchestrationservice.controller;
|
||||
|
||||
|
||||
import com.bipt.intelligentapplicationorchestrationservice.pojo.OptResult;
|
||||
import com.bipt.intelligentapplicationorchestrationservice.service.ServiceAPIService;
|
||||
import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import io.swagger.v3.oas.models.security.SecurityScheme;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tag(name ="服务API相关接口")
|
||||
@RestController
|
||||
@RequestMapping("/API")
|
||||
@Slf4j
|
||||
public class ServiceAPIController {
|
||||
|
||||
@Autowired
|
||||
private ServiceAPIService serviceAPIService;
|
||||
|
||||
@Autowired
|
||||
private NacosServiceUtil nacosServiceUtil;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
@PostMapping("/release")
|
||||
@Operation(summary = "结束访问")
|
||||
@Transactional
|
||||
public OptResult releaseResource(@PathVariable Long modelId) {
|
||||
String key = "modelId:" + modelId;
|
||||
String modelConfig = (String) redisTemplate.opsForValue().get(key);
|
||||
int userMemorySize = parseGpuMemorySize(modelConfig);
|
||||
List<String> instanceIps;
|
||||
try {
|
||||
instanceIps = nacosServiceUtil.getServiceInstances(modelId.toString());
|
||||
} catch (Exception e) {
|
||||
log.error("获取Nacos实例失败", e);
|
||||
return OptResult.error("获取实例失败");
|
||||
}
|
||||
int memorySize;
|
||||
for (String ip : instanceIps) {
|
||||
String ipKey = "ip:" + ip;
|
||||
Integer nowMemorySizeOBJ = (Integer) redisTemplate.opsForValue().get(ipKey);
|
||||
int nowMemorySize = nowMemorySizeOBJ;
|
||||
memorySize = nowMemorySize + userMemorySize;
|
||||
// 更新IP对应的资源值
|
||||
redisTemplate.opsForValue().set(ipKey, memorySize);
|
||||
// 设置缓存过期时间(3600秒)
|
||||
redisTemplate.expire(ipKey, 3600, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
// 处理等待队列(先来先服务)
|
||||
String waitQueueKey = "waitQueue:" + modelId;
|
||||
// 取出队列头部的任务(最早加入的)
|
||||
Long waitModelId = (Long) redisTemplate.opsForList().leftPop(waitQueueKey);
|
||||
if (waitModelId != null) {
|
||||
log.info("检测到等待队列任务,尝试调度模型ID: {}", waitModelId);
|
||||
return schedule(waitModelId); // 重新调度最早的任务
|
||||
} else {
|
||||
log.info("等待队列为空,无任务需要处理");
|
||||
}
|
||||
|
||||
return OptResult.success("资源释放成功");
|
||||
}
|
||||
|
||||
@PostMapping("/request")
|
||||
@Operation(summary = "请求调度")
|
||||
@Transactional
|
||||
public OptResult schedule(@PathVariable Long modelId) {
|
||||
// 1. 存储modelConfig到缓存
|
||||
String modelConfig = serviceAPIService.getByModelId(modelId);
|
||||
int requestMemorySize = parseGpuMemorySize(modelConfig);
|
||||
String modelConfigKey = "modelConfig:" + modelId;
|
||||
redisTemplate.opsForValue().set(modelConfigKey, modelConfig);
|
||||
// 2. 获取Nacos实例IP列表
|
||||
List<String> instanceIps;
|
||||
try {
|
||||
instanceIps = nacosServiceUtil.getServiceInstances(modelId.toString());
|
||||
} catch (Exception e) {
|
||||
log.error("获取Nacos实例失败", e);
|
||||
return OptResult.error("获取实例失败");
|
||||
}
|
||||
Set<String> gpuKeys = redisTemplate.keys("gpu:*");
|
||||
//根据IP列表查找资源
|
||||
for (String instanceIp : instanceIps) {
|
||||
for (String gpuKey : gpuKeys) {
|
||||
String GPUConfig = (String) redisTemplate.opsForValue().get(gpuKey);
|
||||
if (GPUConfig != null) {
|
||||
// 分割键值对
|
||||
String[] pairs = GPUConfig.split(",");
|
||||
String ip = null;
|
||||
int memorySize = 0;
|
||||
for (String pair : pairs) {
|
||||
String[] keyValue = pair.split(":", 2);
|
||||
if (keyValue.length == 2) {
|
||||
String key = keyValue[0].trim();
|
||||
String value = keyValue[1].trim();
|
||||
if ("IP".equalsIgnoreCase(key)) {
|
||||
ip = value;
|
||||
} else if ("GPUMemorySize".equalsIgnoreCase(key)) {
|
||||
memorySize = Integer.parseInt(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
// 检查解析出的 IP 是否在 Nacos 实例列表中
|
||||
if (instanceIp.equals(ip)) {
|
||||
log.info("找到 IP {} 对应的 GPU 内存: {} ", ip, memorySize);
|
||||
if (memorySize>=requestMemorySize){
|
||||
int newMemorySize = memorySize - requestMemorySize;
|
||||
String ipKey = "ip:" + ip;
|
||||
redisTemplate.opsForValue().set(ipKey,newMemorySize);
|
||||
//访问请求最大时间为3600s
|
||||
redisTemplate.expire(ipKey, 3600, TimeUnit.SECONDS);
|
||||
}
|
||||
return OptResult.success("资源分配成功,使用ip:" + ip);
|
||||
}else {
|
||||
log.info("资源不足");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// 所有实例检查完毕未找到足够资源
|
||||
String waitQueueKey = "waitQueue:" + modelId;
|
||||
// 改为右插入,保证队列顺序为FIFO(最早的任务在列表头部)
|
||||
redisTemplate.opsForList().rightPush(waitQueueKey, modelId);
|
||||
log.info("未找到足够资源,任务 {} 加入等待队列", modelId);
|
||||
return OptResult.error("资源不足,等待中");
|
||||
}
|
||||
/**
|
||||
* 从模型配置字符串中解析GPU内存需求
|
||||
* @param modelConfig 模型配置字符串,格式如 "GPUMemorySize:8000,version:1"
|
||||
* @return 解析到的GPU内存大小(MB),若解析失败返回-1
|
||||
*/
|
||||
private int parseGpuMemorySize(String modelConfig) {
|
||||
if (modelConfig == null || modelConfig.isEmpty()) {
|
||||
log.error("模型配置为空,无法解析GPU内存需求");
|
||||
return -1;
|
||||
}
|
||||
int requestMemorySize = 0;
|
||||
String[] config = modelConfig.split(",");
|
||||
for (String pair : config) {
|
||||
// 按冒号分割键值对
|
||||
String[] keyValue = pair.split(":", 2);
|
||||
if (keyValue.length == 2) {
|
||||
String key = keyValue[0].trim();
|
||||
String value = keyValue[1].trim();
|
||||
// 匹配 GPUMemorySize 字段(忽略大小写)
|
||||
if ("GPUMemorySize".equalsIgnoreCase(key)) {
|
||||
try {
|
||||
requestMemorySize = Integer.parseInt(value);
|
||||
log.info("模型GPU内存: {} MB", requestMemorySize);
|
||||
break; // 找到后即可退出循环
|
||||
} catch (NumberFormatException e) {
|
||||
log.error("解析GPUMemorySize失败,值: {}", value, e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (requestMemorySize <= 0) {
|
||||
log.error("模型需求GPU内存未配置或无效");
|
||||
return -1;
|
||||
}
|
||||
return requestMemorySize;
|
||||
}
|
||||
|
||||
}
|
@ -1,112 +0,0 @@
|
||||
package com.bipt.intelligentapplicationorchestrationservice.controller;
|
||||
|
||||
import com.bipt.intelligentapplicationorchestrationservice.config.RedisConfiguration;
|
||||
import com.bipt.intelligentapplicationorchestrationservice.pojo.*;
|
||||
import com.bipt.intelligentapplicationorchestrationservice.service.PublishService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Tag(name ="服务发布相关接口")
|
||||
@RestController
|
||||
@RequestMapping("/publish")
|
||||
@Slf4j
|
||||
public class publishController {
|
||||
@Autowired
|
||||
private PublishService publishService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate redisTemplate;
|
||||
/**
|
||||
* 新增请求发布
|
||||
* @param servicePublishDTO
|
||||
* @return
|
||||
*/
|
||||
@PostMapping
|
||||
@Operation(summary ="新增发布请求")
|
||||
@Transactional
|
||||
public OptResult<List<ServicePublishVO>> save(@RequestBody ServicePublishDTO servicePublishDTO) {
|
||||
log.info("模型发布请求:{}", servicePublishDTO);
|
||||
publishService.save(servicePublishDTO);
|
||||
Long modelId = servicePublishDTO.getModelId();
|
||||
String key = "Model_" + modelId;
|
||||
//查询redis是否存在GPU相关资源数据
|
||||
List<ServicePublishVO> list;
|
||||
list = (List<ServicePublishVO>) redisTemplate.opsForValue().get(key);
|
||||
//如果存在,直接返回,无须查询数据库
|
||||
if (list != null) {
|
||||
return OptResult.success(list);
|
||||
}else {
|
||||
list = new ArrayList<>();
|
||||
}
|
||||
String modelConfig = publishService.getByModelId(modelId);
|
||||
if (modelConfig == null) {
|
||||
log.error("模型配置为空,modelId={}", modelId);
|
||||
}
|
||||
String[] keyValuePairs = modelConfig.split("\\|");
|
||||
String GPUMemorySize = null;
|
||||
String GPUModel = null;
|
||||
for (String pair : keyValuePairs) {
|
||||
pair = pair.trim();
|
||||
if (pair.startsWith("GPU")) {
|
||||
GPUModel = pair.split(";", 2)[1];
|
||||
} else if (pair.startsWith("Memory:")) {
|
||||
GPUMemorySize = pair.split(":", 2)[1];
|
||||
}
|
||||
}
|
||||
ServicePublishVO servicePublishVO = new ServicePublishVO();
|
||||
servicePublishVO.setIp(servicePublishDTO.getIp());
|
||||
servicePublishVO.setModelId(servicePublishDTO.getModelId());
|
||||
servicePublishVO.setGPUMemorySize(GPUMemorySize);
|
||||
servicePublishVO.setGPUModel(GPUModel);
|
||||
//todo 调用模型部署,传递信息
|
||||
|
||||
servicePublishVO.setApiUrl(servicePublishDTO.getApiUrl());
|
||||
list.add(servicePublishVO);
|
||||
redisTemplate.opsForValue().set(key,list);
|
||||
//一个ip上有多个机器
|
||||
// 假设从 Redis 获取的列表元素是 MachineInfo 类型
|
||||
String ip = servicePublishVO.getIp();
|
||||
String key1 = ip;
|
||||
List<MachineInfo> machineList = (List<MachineInfo>) redisTemplate.opsForValue().get(key1);
|
||||
|
||||
|
||||
// 模型所需的 GPU 资源
|
||||
String requiredGPUModel = servicePublishVO.getGPUModel();
|
||||
Integer requiredGPUMemory = Integer.valueOf(servicePublishVO.getGPUMemorySize());
|
||||
|
||||
if (machineList != null) {
|
||||
for (MachineInfo machine : machineList) {
|
||||
// 获取机器的 GPU 资源
|
||||
String machineGPUModel = machine.getGPUModel();
|
||||
Integer machineGPUMemory = machine.getGPUMemorySize();
|
||||
|
||||
// 判断机器是否满足模型需求
|
||||
if (requiredGPUModel.equals(machineGPUModel) &&
|
||||
machineGPUMemory >= requiredGPUMemory) {
|
||||
return OptResult.success(list);
|
||||
}
|
||||
}
|
||||
String key3 = "wait_queue";
|
||||
redisTemplate.opsForValue().set(key3,list);
|
||||
//todo资源释放时候优先分配等待队列中任务
|
||||
|
||||
}
|
||||
|
||||
return OptResult.success(list);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
Reference in New Issue
Block a user