diff --git a/pom.xml b/pom.xml index 7b91012..5aac308 100644 --- a/pom.xml +++ b/pom.xml @@ -51,10 +51,10 @@ org.springframework.boot spring-boot-starter-jdbc - + org.springframework.boot spring-boot-starter-data-redis @@ -100,6 +100,10 @@ org.springframework.cloud spring-cloud-starter-bootstrap + + org.springframework.cloud + spring-cloud-starter-gateway + diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/publishController.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/PublishController.java similarity index 57% rename from src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/publishController.java rename to src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/PublishController.java index ebfe15f..5cdf455 100644 --- a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/publishController.java +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/PublishController.java @@ -11,17 +11,13 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; @Tag(name ="服务发布相关接口") @RestController @RequestMapping("/publish") @Slf4j -public class publishController { +public class PublishController { @Autowired private PublishService publishService; @@ -38,19 +34,35 @@ public class publishController { publishService.save(servicePublishDTO); //todo 调用模型部署 + // 获取前端传来的IP字符串 + String ipListStr = servicePublishDTO.getIp(); + if (ipListStr == null || ipListStr.trim().isEmpty()) { + log.warn("IP列表为空,不进行Nacos注册"); + return OptResult.success(); + } + try { - // 新增传递apiUrl参数 - nacosServiceUtil.registerService( - servicePublishDTO.getModelId().toString(), - servicePublishDTO.getIp(), - 8080, - servicePublishDTO.getApiUrl() - ); - log.info("Nacos服务注册成功"); + // 使用逗号分割IP字符串 + String[] ipArray = ipListStr.split(","); + // 循环注册每个IP到Nacos + for (String ip : ipArray) { + String trimmedIp = ip.trim(); + if (!trimmedIp.isEmpty()) { + nacosServiceUtil.registerService( + servicePublishDTO.getModelId().toString(), + trimmedIp, + 8080, + servicePublishDTO.getApiUrl() + ); + log.info("Nacos服务注册成功: {}", trimmedIp); + } + } } catch (Exception e) { log.error("Nacos服务注册失败", e); + return OptResult.error("Nacos服务注册失败"); // 根据业务需求返回错误 } + return OptResult.success(); } diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/ServiceAPIController.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/ServiceAPIController.java new file mode 100644 index 0000000..f08513e --- /dev/null +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/controller/ServiceAPIController.java @@ -0,0 +1,178 @@ +package com.bipt.intelligentapplicationorchestrationservice.controller; + + +import com.bipt.intelligentapplicationorchestrationservice.pojo.OptResult; +import com.bipt.intelligentapplicationorchestrationservice.service.ServiceAPIService; +import com.bipt.intelligentapplicationorchestrationservice.util.NacosServiceUtil; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import io.swagger.v3.oas.models.security.SecurityScheme; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tag(name ="服务API相关接口") +@RestController +@RequestMapping("/API") +@Slf4j +public class ServiceAPIController { + + @Autowired + private ServiceAPIService serviceAPIService; + + @Autowired + private NacosServiceUtil nacosServiceUtil; + + @Autowired + private RedisTemplate redisTemplate; + @PostMapping("/release") + @Operation(summary = "结束访问") + @Transactional + public OptResult releaseResource(@PathVariable Long modelId) { + String key = "modelId:" + modelId; + String modelConfig = (String) redisTemplate.opsForValue().get(key); + int userMemorySize = parseGpuMemorySize(modelConfig); + List instanceIps; + try { + instanceIps = nacosServiceUtil.getServiceInstances(modelId.toString()); + } catch (Exception e) { + log.error("获取Nacos实例失败", e); + return OptResult.error("获取实例失败"); + } + int memorySize; + for (String ip : instanceIps) { + String ipKey = "ip:" + ip; + Integer nowMemorySizeOBJ = (Integer) redisTemplate.opsForValue().get(ipKey); + int nowMemorySize = nowMemorySizeOBJ; + memorySize = nowMemorySize + userMemorySize; + // 更新IP对应的资源值 + redisTemplate.opsForValue().set(ipKey, memorySize); + // 设置缓存过期时间(3600秒) + redisTemplate.expire(ipKey, 3600, TimeUnit.SECONDS); + } + + // 处理等待队列(先来先服务) + String waitQueueKey = "waitQueue:" + modelId; + // 取出队列头部的任务(最早加入的) + Long waitModelId = (Long) redisTemplate.opsForList().leftPop(waitQueueKey); + if (waitModelId != null) { + log.info("检测到等待队列任务,尝试调度模型ID: {}", waitModelId); + return schedule(waitModelId); // 重新调度最早的任务 + } else { + log.info("等待队列为空,无任务需要处理"); + } + + return OptResult.success("资源释放成功"); + } + + @PostMapping("/request") + @Operation(summary = "请求调度") + @Transactional + public OptResult schedule(@PathVariable Long modelId) { + // 1. 存储modelConfig到缓存 + String modelConfig = serviceAPIService.getByModelId(modelId); + int requestMemorySize = parseGpuMemorySize(modelConfig); + String modelConfigKey = "modelConfig:" + modelId; + redisTemplate.opsForValue().set(modelConfigKey, modelConfig); + // 2. 获取Nacos实例IP列表 + List instanceIps; + try { + instanceIps = nacosServiceUtil.getServiceInstances(modelId.toString()); + } catch (Exception e) { + log.error("获取Nacos实例失败", e); + return OptResult.error("获取实例失败"); + } + Set gpuKeys = redisTemplate.keys("gpu:*"); + //根据IP列表查找资源 + for (String instanceIp : instanceIps) { + for (String gpuKey : gpuKeys) { + String GPUConfig = (String) redisTemplate.opsForValue().get(gpuKey); + if (GPUConfig != null) { + // 分割键值对 + String[] pairs = GPUConfig.split(","); + String ip = null; + int memorySize = 0; + for (String pair : pairs) { + String[] keyValue = pair.split(":", 2); + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + if ("IP".equalsIgnoreCase(key)) { + ip = value; + } else if ("GPUMemorySize".equalsIgnoreCase(key)) { + memorySize = Integer.parseInt(value); + } + } + } + // 检查解析出的 IP 是否在 Nacos 实例列表中 + if (instanceIp.equals(ip)) { + log.info("找到 IP {} 对应的 GPU 内存: {} ", ip, memorySize); + if (memorySize>=requestMemorySize){ + int newMemorySize = memorySize - requestMemorySize; + String ipKey = "ip:" + ip; + redisTemplate.opsForValue().set(ipKey,newMemorySize); + //访问请求最大时间为3600s + redisTemplate.expire(ipKey, 3600, TimeUnit.SECONDS); + } + return OptResult.success("资源分配成功,使用ip:" + ip); + }else { + log.info("资源不足"); + } + } + } + } + // 所有实例检查完毕未找到足够资源 + String waitQueueKey = "waitQueue:" + modelId; + // 改为右插入,保证队列顺序为FIFO(最早的任务在列表头部) + redisTemplate.opsForList().rightPush(waitQueueKey, modelId); + log.info("未找到足够资源,任务 {} 加入等待队列", modelId); + return OptResult.error("资源不足,等待中"); + } + /** + * 从模型配置字符串中解析GPU内存需求 + * @param modelConfig 模型配置字符串,格式如 "GPUMemorySize:8000,version:1" + * @return 解析到的GPU内存大小(MB),若解析失败返回-1 + */ + private int parseGpuMemorySize(String modelConfig) { + if (modelConfig == null || modelConfig.isEmpty()) { + log.error("模型配置为空,无法解析GPU内存需求"); + return -1; + } + int requestMemorySize = 0; + String[] config = modelConfig.split(","); + for (String pair : config) { + // 按冒号分割键值对 + String[] keyValue = pair.split(":", 2); + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + // 匹配 GPUMemorySize 字段(忽略大小写) + if ("GPUMemorySize".equalsIgnoreCase(key)) { + try { + requestMemorySize = Integer.parseInt(value); + log.info("模型GPU内存: {} MB", requestMemorySize); + break; // 找到后即可退出循环 + } catch (NumberFormatException e) { + log.error("解析GPUMemorySize失败,值: {}", value, e); + return -1; + } + } + } + } + if (requestMemorySize <= 0) { + log.error("模型需求GPU内存未配置或无效"); + return -1; + } + return requestMemorySize; + } + +} diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/PublishMapper.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/PublishMapper.java index 5582b45..38c48a6 100644 --- a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/PublishMapper.java +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/PublishMapper.java @@ -14,7 +14,6 @@ public interface PublishMapper { void insert(ServicePublishDTO servicePublishDTO); Long getByApiUrl(String apiUrl); - @Select("select model_config from model_version where model_id=#{modelId} and status = 1") - String getById(Long modelId); + } diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/ServiceAPIMapper.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/ServiceAPIMapper.java new file mode 100644 index 0000000..c27b445 --- /dev/null +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/mapper/ServiceAPIMapper.java @@ -0,0 +1,11 @@ +package com.bipt.intelligentapplicationorchestrationservice.mapper; + + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +@Mapper +public interface ServiceAPIMapper { + @Select("select model_config from model_version where model_id=#{modelId} and status = 1") + String getById(Long modelId); +} diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/PublishServiceImpl.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/PublishServiceImpl.java index 52b422a..4788340 100644 --- a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/PublishServiceImpl.java +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/PublishServiceImpl.java @@ -29,18 +29,12 @@ public class PublishServiceImpl implements PublishService { if (id != null){ throw new IllegalArgumentException("请求已存在: " + apiUrl); } + + //todo调用服务部署 + publishMapper.insert(servicePublishDTO); } - /** - * 根据id查找配置信息 - * @param modelId - * @return - */ - @Override - public String getByModelId(Long modelId) { - return publishMapper.getById(modelId); - } } diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/ServiceAPIImpl.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/ServiceAPIImpl.java new file mode 100644 index 0000000..b2a0d54 --- /dev/null +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/Impl/ServiceAPIImpl.java @@ -0,0 +1,23 @@ +package com.bipt.intelligentapplicationorchestrationservice.service.Impl; + +import com.bipt.intelligentapplicationorchestrationservice.mapper.ServiceAPIMapper; +import com.bipt.intelligentapplicationorchestrationservice.service.ServiceAPIService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class ServiceAPIImpl implements ServiceAPIService { + @Autowired + private ServiceAPIMapper serviceAPIMapper; + /** + * 根据id查找配置信息 + * @param modelId + * @return + */ + @Override + public String getByModelId(Long modelId) { + return serviceAPIMapper.getById(modelId); + }; +} diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/PublishService.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/PublishService.java index 4d3b410..851891f 100644 --- a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/PublishService.java +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/PublishService.java @@ -9,5 +9,4 @@ public interface PublishService { - String getByModelId(Long modelId); } diff --git a/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/ServiceAPIService.java b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/ServiceAPIService.java new file mode 100644 index 0000000..4c95bb4 --- /dev/null +++ b/src/main/java/com/bipt/intelligentapplicationorchestrationservice/service/ServiceAPIService.java @@ -0,0 +1,5 @@ +package com.bipt.intelligentapplicationorchestrationservice.service; + +public interface ServiceAPIService { + String getByModelId(Long modelId); +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index d8ff972..aa70b38 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -18,4 +18,10 @@ spring.data.redis.host=116.205.121.200 spring.data.redis.port=6379 spring.data.redis.password=Jbjhhzstsl97@ spring.data.redis.database=0 -spring.data.redis.timeout=3000 \ No newline at end of file +spring.data.redis.timeout=3000 + +# 服务路由配置 +spring.cloud.gateway.routes[0].id=request-service-route +spring.cloud.gateway.routes[0].uri=lb://intelligent-application-orchestration-service +spring.cloud.gateway.routes[0].predicates[0]=Path=/request +