refactor: 重构部署功能
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
import { PipelineRunner } from '../runners/index.ts';
|
||||
import { prisma } from './prisma.ts';
|
||||
import { log } from '../libs/logger.ts';
|
||||
|
||||
const TAG = 'Queue';
|
||||
// 存储正在运行的部署任务
|
||||
const runningDeployments = new Set<number>();
|
||||
|
||||
@@ -40,14 +42,14 @@ export class ExecutionQueue {
|
||||
* 初始化执行队列,包括恢复未完成的任务
|
||||
*/
|
||||
public async initialize(): Promise<void> {
|
||||
console.log('Initializing execution queue...');
|
||||
log.info(TAG, 'Initializing execution queue...');
|
||||
// 恢复未完成的任务
|
||||
await this.recoverPendingDeployments();
|
||||
|
||||
// 启动定时轮询
|
||||
this.startPolling();
|
||||
|
||||
console.log('Execution queue initialized');
|
||||
log.info(TAG, 'Execution queue initialized');
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -55,7 +57,7 @@ export class ExecutionQueue {
|
||||
*/
|
||||
private async recoverPendingDeployments(): Promise<void> {
|
||||
try {
|
||||
console.log('Recovering pending deployments from database...');
|
||||
log.info(TAG, 'Recovering pending deployments from database...');
|
||||
|
||||
// 查询数据库中状态为pending的部署任务
|
||||
const pendingDeployments = await prisma.deployment.findMany({
|
||||
@@ -69,16 +71,16 @@ export class ExecutionQueue {
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`Found ${pendingDeployments.length} pending deployments`);
|
||||
log.info(TAG, `Found ${pendingDeployments.length} pending deployments`);
|
||||
|
||||
// 将这些任务添加到执行队列中
|
||||
for (const deployment of pendingDeployments) {
|
||||
await this.addTask(deployment.id, deployment.pipelineId);
|
||||
}
|
||||
|
||||
console.log('Pending deployments recovery completed');
|
||||
log.info(TAG, 'Pending deployments recovery completed');
|
||||
} catch (error) {
|
||||
console.error('Failed to recover pending deployments:', error);
|
||||
log.error(TAG, 'Failed to recover pending deployments:', error);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,12 +89,12 @@ export class ExecutionQueue {
|
||||
*/
|
||||
private startPolling(): void {
|
||||
if (this.isPolling) {
|
||||
console.log('Polling is already running');
|
||||
log.info(TAG, 'Polling is already running');
|
||||
return;
|
||||
}
|
||||
|
||||
this.isPolling = true;
|
||||
console.log(`Starting polling with interval ${POLLING_INTERVAL}ms`);
|
||||
log.info(TAG, `Starting polling with interval ${POLLING_INTERVAL}ms`);
|
||||
|
||||
// 立即执行一次检查
|
||||
this.checkPendingDeployments();
|
||||
@@ -111,7 +113,7 @@ export class ExecutionQueue {
|
||||
clearInterval(pollingTimer);
|
||||
pollingTimer = null;
|
||||
this.isPolling = false;
|
||||
console.log('Polling stopped');
|
||||
log.info(TAG, 'Polling stopped');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +122,7 @@ export class ExecutionQueue {
|
||||
*/
|
||||
private async checkPendingDeployments(): Promise<void> {
|
||||
try {
|
||||
console.log('Checking for pending deployments in database...');
|
||||
log.info(TAG, 'Checking for pending deployments in database...');
|
||||
|
||||
// 查询数据库中状态为pending的部署任务
|
||||
const pendingDeployments = await prisma.deployment.findMany({
|
||||
@@ -134,7 +136,8 @@ export class ExecutionQueue {
|
||||
},
|
||||
});
|
||||
|
||||
console.log(
|
||||
log.info(
|
||||
TAG,
|
||||
`Found ${pendingDeployments.length} pending deployments in polling`,
|
||||
);
|
||||
|
||||
@@ -142,14 +145,15 @@ export class ExecutionQueue {
|
||||
for (const deployment of pendingDeployments) {
|
||||
// 检查是否已经在运行队列中
|
||||
if (!runningDeployments.has(deployment.id)) {
|
||||
console.log(
|
||||
log.info(
|
||||
TAG,
|
||||
`Adding deployment ${deployment.id} to queue from polling`,
|
||||
);
|
||||
await this.addTask(deployment.id, deployment.pipelineId);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Failed to check pending deployments:', error);
|
||||
log.error(TAG, 'Failed to check pending deployments:', error);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +168,7 @@ export class ExecutionQueue {
|
||||
): Promise<void> {
|
||||
// 检查是否已经在运行队列中
|
||||
if (runningDeployments.has(deploymentId)) {
|
||||
console.log(`Deployment ${deploymentId} is already queued or running`);
|
||||
log.info(TAG, `Deployment ${deploymentId} is already queued or running`);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -194,7 +198,7 @@ export class ExecutionQueue {
|
||||
// 执行流水线
|
||||
await this.executePipeline(task.deploymentId, task.pipelineId);
|
||||
} catch (error) {
|
||||
console.error('执行流水线失败:', error);
|
||||
log.error(TAG, '执行流水线失败:', error);
|
||||
// 这里可以添加更多的错误处理逻辑
|
||||
} finally {
|
||||
// 从运行队列中移除
|
||||
@@ -245,7 +249,7 @@ export class ExecutionQueue {
|
||||
);
|
||||
await runner.run(pipelineId);
|
||||
} catch (error) {
|
||||
console.error('执行流水线失败:', error);
|
||||
log.error(TAG, '执行流水线失败:', error);
|
||||
// 错误处理可以在这里添加,比如更新部署状态为失败
|
||||
throw error;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user