错误处理
构建稳定可靠的N8N工作流,掌握全面的错误处理和恢复机制
错误处理
在实际的自动化工作流中,错误是不可避免的。网络故障、API限制、数据格式问题、系统异常等都可能导致工作流中断。建立完善的错误处理机制,是构建生产级工作流的关键。
🛡️ 错误处理策略
错误类型分类
错误类型 | 特征 | 处理策略 | 示例 |
---|---|---|---|
网络错误 | 临时性、可重试 | 重试机制 | API超时、连接失败 |
认证错误 | 配置性、需修复 | 告警通知 | Token过期、权限不足 |
数据错误 | 输入性、可跳过 | 记录并继续 | 格式错误、缺失字段 |
逻辑错误 | 代码性、需修复 | 停止并告警 | 除零错误、空引用 |
资源错误 | 系统性、需等待 | 延时重试 | 内存不足、存储满 |
处理原则
- 快速失败: 不可恢复的错误立即停止
- 优雅降级: 次要功能失败不影响主流程
- 自动恢复: 临时性错误自动重试
- 详细记录: 记录错误上下文便于排查
- 用户友好: 提供清晰的错误信息
🔧 节点级别错误处理
Continue On Fail 配置
// 在节点设置中启用错误继续
Continue On Fail: true
// 这样即使节点失败,工作流也会继续执行
// 失败的节点会输出错误信息而不是停止整个流程
错误数据结构
// 当节点失败时的输出格式
{
"error": {
"message": "Request failed with status code 404",
"name": "Error",
"stack": "Error: Request failed...",
"httpCode": "404",
"cause": {
"url": "https://api.example.com/user/999",
"method": "GET"
}
}
}
IF 节点错误检查
// 检查上一个节点是否出错
{{ $json.error !== undefined }}
// 检查特定类型的错误
{{ $json.error && $json.error.httpCode === "404" }}
// 检查网络相关错误
{{ $json.error && $json.error.message.includes("timeout") }}
🔄 重试机制实现
简单重试逻辑
// Function 节点:基础重试机制
const maxRetries = 3;
const retryDelay = 2000; // 2秒
const currentAttempt = $json.attempt || 1;
async function performOperation() {
try {
// 执行主要操作
const result = await callExternalAPI($json.data);
return {
success: true,
data: result,
attempts: currentAttempt
};
} catch (error) {
console.log(`Attempt ${currentAttempt} failed:`, error.message);
if (currentAttempt < maxRetries) {
// 准备重试
return {
success: false,
retry: true,
attempt: currentAttempt + 1,
error: error.message,
retryAt: new Date(Date.now() + retryDelay).toISOString()
};
} else {
// 重试次数用完,最终失败
throw new Error(`Operation failed after ${maxRetries} attempts: ${error.message}`);
}
}
}
const result = await performOperation();
return [{ json: result }];
指数退避重试
// Function 节点:指数退避重试策略
const maxRetries = 5;
const baseDelay = 1000; // 基础延迟1秒
const maxDelay = 30000; // 最大延迟30秒
const currentAttempt = $json.attempt || 1;
function calculateDelay(attempt) {
// 指数退避: 1s, 2s, 4s, 8s, 16s, 30s(max)
const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay);
// 添加随机抖动,避免同时重试造成雪崩
const jitter = Math.random() * 0.1 * delay;
return Math.floor(delay + jitter);
}
async function operationWithRetry() {
try {
const result = await executeAPICall($json);
return {
success: true,
data: result,
totalAttempts: currentAttempt
};
} catch (error) {
const isRetryableError = checkIfRetryable(error);
if (isRetryableError && currentAttempt < maxRetries) {
const nextDelay = calculateDelay(currentAttempt + 1);
console.log(`Attempt ${currentAttempt} failed, retrying in ${nextDelay}ms`);
return {
success: false,
retry: true,
attempt: currentAttempt + 1,
error: error.message,
nextRetryDelay: nextDelay,
retryAt: new Date(Date.now() + nextDelay).toISOString()
};
} else {
// 不可重试的错误或重试次数用完
throw new Error(`${isRetryableError ? 'Max retries exceeded' : 'Non-retryable error'}: ${error.message}`);
}
}
}
function checkIfRetryable(error) {
// 定义可重试的错误类型
const retryableErrors = [
'timeout',
'ECONNRESET',
'ENOTFOUND',
'socket hang up',
'network'
];
const retryableHttpCodes = [429, 500, 502, 503, 504];
// 检查错误消息
const messageRetryable = retryableErrors.some(keyword =>
error.message.toLowerCase().includes(keyword)
);
// 检查HTTP状态码
const httpCodeRetryable = error.response &&
retryableHttpCodes.includes(error.response.status);
return messageRetryable || httpCodeRetryable;
}
const result = await operationWithRetry();
return [{ json: result }];
条件重试工作流
// 使用 Wait 节点实现延时重试
// 1. HTTP Request 节点 (Continue On Fail: true)
// 2. IF 节点检查是否需要重试
{{ $json.error && $json.attempt < 3 }}
// True 分支:准备重试
// 3. Set 节点:增加重试计数
{
"attempt": "{{ ($json.attempt || 0) + 1 }}",
"originalData": "{{ $json.originalData || $json }}",
"lastError": "{{ $json.error.message }}",
"retryReason": "{{ $json.error.httpCode || 'Unknown error' }}"
}
// 4. Wait 节点:延时
Amount: "{{ Math.pow(2, $json.attempt) }}" // 指数退避
Unit: "seconds"
// 5. 连接回到 HTTP Request 节点形成循环
// False 分支:处理最终结果或错误
🚨 错误监控和告警
错误日志记录
// Function 节点:结构化错误日志
class ErrorLogger {
constructor(workflowId, executionId) {
this.workflowId = workflowId;
this.executionId = executionId;
this.context = {
timestamp: new Date().toISOString(),
environment: $env.NODE_ENV || 'unknown',
version: $env.WORKFLOW_VERSION || '1.0.0'
};
}
log(level, message, error = null, additionalData = {}) {
const logEntry = {
level: level,
message: message,
workflowId: this.workflowId,
executionId: this.executionId,
...this.context,
...additionalData
};
if (error) {
logEntry.error = {
name: error.name,
message: error.message,
stack: error.stack,
code: error.code,
httpStatus: error.response?.status
};
}
// 输出到控制台
console.log(JSON.stringify(logEntry));
// 可选:发送到日志服务
this.sendToLogService(logEntry);
}
async sendToLogService(logEntry) {
try {
// 发送到外部日志服务 (如 ELK, Splunk, 或自建服务)
if ($env.LOG_SERVICE_URL) {
await fetch($env.LOG_SERVICE_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(logEntry)
});
}
} catch (e) {
console.error('Failed to send log to service:', e.message);
}
}
error(message, error, data = {}) {
this.log('ERROR', message, error, data);
}
warn(message, data = {}) {
this.log('WARN', message, null, data);
}
info(message, data = {}) {
this.log('INFO', message, null, data);
}
}
// 使用示例
const logger = new ErrorLogger($workflow.id, $execution.id);
try {
const result = await riskyOperation($json);
logger.info('Operation completed successfully', {
operation: 'user_creation',
userId: result.id
});
} catch (error) {
logger.error('Operation failed', error, {
operation: 'user_creation',
inputData: $json,
retryCount: $json.retryCount || 0
});
// 重新抛出错误或返回错误响应
throw error;
}
实时告警系统
// Function 节点:智能告警系统
class AlertManager {
constructor() {
this.alertThresholds = {
error_rate: 0.1, // 10%错误率
response_time: 5000, // 5秒响应时间
consecutive_fails: 3 // 连续3次失败
};
this.alertChannels = [
{ type: 'email', config: { to: $env.ALERT_EMAIL } },
{ type: 'dingtalk', config: { webhook: $env.DINGTALK_WEBHOOK } },
{ type: 'slack', config: { webhook: $env.SLACK_WEBHOOK } }
];
}
async checkAndAlert(workflowStats, currentError = null) {
const alerts = [];
// 检查错误率
if (workflowStats.errorRate > this.alertThresholds.error_rate) {
alerts.push({
type: 'high_error_rate',
severity: 'warning',
message: `工作流错误率过高: ${(workflowStats.errorRate * 100).toFixed(1)}%`,
data: workflowStats
});
}
// 检查响应时间
if (workflowStats.avgResponseTime > this.alertThresholds.response_time) {
alerts.push({
type: 'slow_response',
severity: 'warning',
message: `工作流响应时间过长: ${workflowStats.avgResponseTime}ms`,
data: workflowStats
});
}
// 检查连续失败
if (workflowStats.consecutiveFails >= this.alertThresholds.consecutive_fails) {
alerts.push({
type: 'consecutive_failures',
severity: 'critical',
message: `工作流连续失败 ${workflowStats.consecutiveFails} 次`,
data: workflowStats
});
}
// 检查当前错误
if (currentError && this.isCriticalError(currentError)) {
alerts.push({
type: 'critical_error',
severity: 'critical',
message: `严重错误: ${currentError.message}`,
data: { error: currentError, stats: workflowStats }
});
}
// 发送告警
for (const alert of alerts) {
await this.sendAlert(alert);
}
return alerts;
}
isCriticalError(error) {
const criticalKeywords = [
'database connection',
'payment failed',
'security',
'authentication failed',
'data corruption'
];
return criticalKeywords.some(keyword =>
error.message.toLowerCase().includes(keyword)
);
}
async sendAlert(alert) {
const alertMessage = this.formatAlertMessage(alert);
for (const channel of this.alertChannels) {
try {
switch (channel.type) {
case 'email':
await this.sendEmailAlert(alertMessage, channel.config);
break;
case 'dingtalk':
await this.sendDingTalkAlert(alertMessage, channel.config);
break;
case 'slack':
await this.sendSlackAlert(alertMessage, channel.config);
break;
}
} catch (error) {
console.error(`Failed to send alert via ${channel.type}:`, error.message);
}
}
}
formatAlertMessage(alert) {
const severityEmoji = {
'info': 'ℹ️',
'warning': '⚠️',
'critical': '🚨'
};
return {
title: `${severityEmoji[alert.severity]} N8N工作流告警`,
message: alert.message,
severity: alert.severity,
type: alert.type,
timestamp: new Date().toISOString(),
workflowId: $workflow.id,
executionId: $execution.id,
data: alert.data
};
}
async sendDingTalkAlert(alert, config) {
const message = {
msgtype: 'markdown',
markdown: {
title: alert.title,
text: `
### ${alert.title}
**告警级别**: ${alert.severity}
**告警类型**: ${alert.type}
**告警消息**: ${alert.message}
**时间**: ${alert.timestamp}
**工作流ID**: ${alert.workflowId}
**执行ID**: ${alert.executionId}
[查看详情](${$env.N8N_BASE_URL}/executions/${alert.executionId})
`
}
};
await fetch(config.webhook, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(message)
});
}
async sendEmailAlert(alert, config) {
// 邮件告警实现
const emailBody = `
<h2>${alert.title}</h2>
<p><strong>告警消息:</strong> ${alert.message}</p>
<p><strong>严重级别:</strong> ${alert.severity}</p>
<p><strong>时间:</strong> ${alert.timestamp}</p>
<p><strong>工作流:</strong> ${alert.workflowId}</p>
`;
// 使用邮件节点发送...
}
}
// 使用示例
const alertManager = new AlertManager();
// 获取工作流统计数据
const workflowStats = {
errorRate: 0.15, // 15%错误率
avgResponseTime: 3000, // 3秒响应时间
consecutiveFails: 2, // 连续2次失败
totalExecutions: 100,
failedExecutions: 15
};
// 检查并发送告警
const alerts = await alertManager.checkAndAlert(workflowStats, $json.error);
console.log(`发送了 ${alerts.length} 个告警`);
🔁 断路器模式
// Function 节点:断路器实现
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.recoveryTimeout = options.recoveryTimeout || 60000; // 1分钟
this.monitoringPeriod = options.monitoringPeriod || 300000; // 5分钟
// 从静态数据获取状态
const staticData = $node.getWorkflowStaticData('node');
this.state = staticData.circuitState || 'CLOSED';
this.failureCount = staticData.failureCount || 0;
this.lastFailureTime = staticData.lastFailureTime || 0;
this.lastSuccessTime = staticData.lastSuccessTime || Date.now();
}
async execute(operation) {
// 检查断路器状态
this.updateState();
switch (this.state) {
case 'OPEN':
throw new Error('Circuit breaker is OPEN - service is currently unavailable');
case 'HALF_OPEN':
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
case 'CLOSED':
default:
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
}
updateState() {
const now = Date.now();
if (this.state === 'OPEN' &&
now - this.lastFailureTime > this.recoveryTimeout) {
this.state = 'HALF_OPEN';
console.log('Circuit breaker moved to HALF_OPEN state');
}
}
onSuccess() {
this.failureCount = 0;
this.lastSuccessTime = Date.now();
if (this.state === 'HALF_OPEN') {
this.state = 'CLOSED';
console.log('Circuit breaker moved to CLOSED state');
}
this.saveState();
}
onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
console.log(`Circuit breaker moved to OPEN state after ${this.failureCount} failures`);
}
this.saveState();
}
saveState() {
const staticData = $node.getWorkflowStaticData('node');
staticData.circuitState = this.state;
staticData.failureCount = this.failureCount;
staticData.lastFailureTime = this.lastFailureTime;
staticData.lastSuccessTime = this.lastSuccessTime;
}
getStatus() {
return {
state: this.state,
failureCount: this.failureCount,
lastFailureTime: this.lastFailureTime,
lastSuccessTime: this.lastSuccessTime
};
}
}
// 使用断路器保护外部API调用
const circuitBreaker = new CircuitBreaker({
failureThreshold: 3,
recoveryTimeout: 30000, // 30秒
monitoringPeriod: 120000 // 2分钟
});
async function protectedAPICall() {
return await fetch($env.EXTERNAL_API_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify($json.requestData)
}).then(response => {
if (!response.ok) {
throw new Error(`API call failed with status ${response.status}`);
}
return response.json();
});
}
try {
const result = await circuitBreaker.execute(protectedAPICall);
return [{
json: {
success: true,
data: result,
circuitStatus: circuitBreaker.getStatus()
}
}];
} catch (error) {
return [{
json: {
success: false,
error: error.message,
circuitStatus: circuitBreaker.getStatus()
}
}];
}
🎯 实战案例:订单处理错误处理
业务场景
电商订单处理系统,需要处理支付、库存、物流等多个环节的潜在错误。
完整错误处理方案
// Function 节点:订单处理主流程
class OrderProcessor {
constructor() {
this.logger = new ErrorLogger('order-processing', $execution.id);
this.alertManager = new AlertManager();
this.maxRetries = 3;
this.retryDelay = 2000;
}
async processOrder(orderData) {
const orderId = orderData.orderId;
this.logger.info('开始处理订单', { orderId });
try {
// 1. 验证订单数据
await this.validateOrder(orderData);
// 2. 检查库存
await this.checkInventory(orderData);
// 3. 处理支付
const paymentResult = await this.processPayment(orderData);
// 4. 更新库存
await this.updateInventory(orderData);
// 5. 创建发货任务
await this.createShippingTask(orderData, paymentResult);
// 6. 发送确认通知
await this.sendConfirmation(orderData);
this.logger.info('订单处理完成', { orderId });
return {
success: true,
orderId: orderId,
status: 'completed',
paymentId: paymentResult.transactionId
};
} catch (error) {
return await this.handleOrderError(orderData, error);
}
}
async handleOrderError(orderData, error) {
const orderId = orderData.orderId;
this.logger.error('订单处理失败', error, { orderId });
// 根据错误类型决定处理策略
const errorType = this.classifyError(error);
switch (errorType) {
case 'PAYMENT_FAILED':
return await this.handlePaymentError(orderData, error);
case 'INVENTORY_INSUFFICIENT':
return await this.handleInventoryError(orderData, error);
case 'NETWORK_ERROR':
return await this.handleNetworkError(orderData, error);
case 'SYSTEM_ERROR':
return await this.handleSystemError(orderData, error);
default:
return await this.handleUnknownError(orderData, error);
}
}
async handlePaymentError(orderData, error) {
// 支付失败处理
const orderId = orderData.orderId;
// 1. 更新订单状态
await this.updateOrderStatus(orderId, 'payment_failed');
// 2. 释放库存
await this.releaseInventory(orderData);
// 3. 通知客户
await this.notifyCustomer(orderData, 'payment_failed', {
message: '支付处理失败,请重新尝试或联系客服',
retryUrl: `${$env.FRONTEND_URL}/orders/${orderId}/retry`
});
// 4. 如果是可重试的支付错误,创建重试任务
if (this.isRetryablePaymentError(error)) {
await this.schedulePaymentRetry(orderData);
}
return {
success: false,
orderId: orderId,
status: 'payment_failed',
error: error.message,
retryable: this.isRetryablePaymentError(error)
};
}
async handleInventoryError(orderData, error) {
const orderId = orderData.orderId;
// 库存不足处理
await this.updateOrderStatus(orderId, 'inventory_insufficient');
// 通知客户并提供替代方案
await this.notifyCustomer(orderData, 'inventory_insufficient', {
message: '商品库存不足',
alternatives: await this.findAlternativeProducts(orderData.items)
});
// 通知采购团队
await this.notifyPurchasing(orderData);
return {
success: false,
orderId: orderId,
status: 'inventory_insufficient',
error: error.message,
retryable: false
};
}
async handleNetworkError(orderData, error) {
const orderId = orderData.orderId;
const retryCount = orderData.retryCount || 0;
if (retryCount < this.maxRetries) {
// 网络错误重试
this.logger.warn('网络错误,准备重试', {
orderId,
retryCount: retryCount + 1
});
const retryDelay = this.calculateRetryDelay(retryCount + 1);
return {
success: false,
orderId: orderId,
status: 'retrying',
error: error.message,
retryable: true,
retryCount: retryCount + 1,
retryAt: new Date(Date.now() + retryDelay).toISOString()
};
} else {
// 重试次数用完,标记为失败
await this.updateOrderStatus(orderId, 'network_failed');
await this.notifyCustomer(orderData, 'processing_failed');
// 发送告警
await this.alertManager.sendAlert({
type: 'order_processing_failed',
severity: 'critical',
message: `订单 ${orderId} 在网络重试 ${this.maxRetries} 次后仍然失败`,
data: { orderId, error: error.message }
});
return {
success: false,
orderId: orderId,
status: 'failed',
error: `网络错误重试 ${this.maxRetries} 次后仍然失败`,
retryable: false
};
}
}
async handleSystemError(orderData, error) {
const orderId = orderData.orderId;
// 系统错误,立即告警并暂停处理
await this.alertManager.sendAlert({
type: 'system_error',
severity: 'critical',
message: `系统错误导致订单 ${orderId} 处理失败`,
data: { orderId, error: error.message, stack: error.stack }
});
// 暂停订单处理
await this.updateOrderStatus(orderId, 'system_error');
// 人工介入通知
await this.notifySupport(orderData, error);
return {
success: false,
orderId: orderId,
status: 'system_error',
error: error.message,
retryable: false,
requiresManualIntervention: true
};
}
classifyError(error) {
const message = error.message.toLowerCase();
if (message.includes('payment') || message.includes('transaction')) {
return 'PAYMENT_FAILED';
}
if (message.includes('inventory') || message.includes('stock')) {
return 'INVENTORY_INSUFFICIENT';
}
if (message.includes('network') || message.includes('timeout') ||
message.includes('connection')) {
return 'NETWORK_ERROR';
}
if (message.includes('database') || message.includes('system') ||
message.includes('internal')) {
return 'SYSTEM_ERROR';
}
return 'UNKNOWN_ERROR';
}
calculateRetryDelay(retryCount) {
// 指数退避 + 随机抖动
const baseDelay = this.retryDelay * Math.pow(2, retryCount - 1);
const maxDelay = 30000; // 最大30秒
const jitter = Math.random() * 0.1 * baseDelay;
return Math.min(baseDelay + jitter, maxDelay);
}
// 其他辅助方法...
async validateOrder(orderData) {
if (!orderData.orderId) throw new Error('订单ID缺失');
if (!orderData.customerId) throw new Error('客户ID缺失');
if (!orderData.items || orderData.items.length === 0) {
throw new Error('订单商品列表为空');
}
// 更多验证逻辑...
}
async checkInventory(orderData) {
// 库存检查逻辑...
}
async processPayment(orderData) {
// 支付处理逻辑...
}
// ... 其他方法实现
}
// 主处理逻辑
const processor = new OrderProcessor();
const result = await processor.processOrder($json);
return [{ json: result }];
📋 错误处理最佳实践
1. 分层错误处理
// 应用层错误处理
try {
// 业务逻辑
} catch (error) {
if (error instanceof BusinessError) {
// 业务错误处理
} else if (error instanceof ValidationError) {
// 验证错误处理
} else {
// 系统错误处理
}
}
2. 错误边界设置
// 在关键节点设置错误边界
const criticalOperations = [
'payment_processing',
'data_persistence',
'external_api_calls'
];
if (criticalOperations.includes($json.operationType)) {
// 使用更严格的错误处理
// 不允许继续执行
} else {
// 允许降级处理
}
3. 用户友好的错误信息
function formatUserError(error) {
const userFriendlyMessages = {
'PAYMENT_DECLINED': '您的支付被拒绝,请检查银行卡信息或联系银行',
'NETWORK_TIMEOUT': '网络连接超时,请稍后重试',
'INVENTORY_INSUFFICIENT': '商品库存不足,我们将尽快补货',
'SYSTEM_MAINTENANCE': '系统正在维护中,请稍后再试'
};
return userFriendlyMessages[error.code] || '操作失败,请联系客服';
}
🚀 下一步学习
掌握错误处理后,继续学习:
完善的错误处理是生产级工作流的基石。通过系统性的错误处理策略,你能构建出健壮、可靠的自动化系统,即使在异常情况下也能优雅地运行!