Logon8n中文教程

错误处理

构建稳定可靠的N8N工作流,掌握全面的错误处理和恢复机制

错误处理

在实际的自动化工作流中,错误是不可避免的。网络故障、API限制、数据格式问题、系统异常等都可能导致工作流中断。建立完善的错误处理机制,是构建生产级工作流的关键。

🛡️ 错误处理策略

错误类型分类

错误类型特征处理策略示例
网络错误临时性、可重试重试机制API超时、连接失败
认证错误配置性、需修复告警通知Token过期、权限不足
数据错误输入性、可跳过记录并继续格式错误、缺失字段
逻辑错误代码性、需修复停止并告警除零错误、空引用
资源错误系统性、需等待延时重试内存不足、存储满

处理原则

  1. 快速失败: 不可恢复的错误立即停止
  2. 优雅降级: 次要功能失败不影响主流程
  3. 自动恢复: 临时性错误自动重试
  4. 详细记录: 记录错误上下文便于排查
  5. 用户友好: 提供清晰的错误信息

🔧 节点级别错误处理

Continue On Fail 配置

// 在节点设置中启用错误继续
Continue On Fail: true

// 这样即使节点失败,工作流也会继续执行
// 失败的节点会输出错误信息而不是停止整个流程

错误数据结构

// 当节点失败时的输出格式
{
  "error": {
    "message": "Request failed with status code 404",
    "name": "Error",
    "stack": "Error: Request failed...",
    "httpCode": "404",
    "cause": {
      "url": "https://api.example.com/user/999",
      "method": "GET"
    }
  }
}

IF 节点错误检查

// 检查上一个节点是否出错
{{ $json.error !== undefined }}

// 检查特定类型的错误
{{ $json.error && $json.error.httpCode === "404" }}

// 检查网络相关错误
{{ $json.error && $json.error.message.includes("timeout") }}

🔄 重试机制实现

简单重试逻辑

// Function 节点:基础重试机制
const maxRetries = 3;
const retryDelay = 2000; // 2秒
const currentAttempt = $json.attempt || 1;

async function performOperation() {
  try {
    // 执行主要操作
    const result = await callExternalAPI($json.data);
    return {
      success: true,
      data: result,
      attempts: currentAttempt
    };
  } catch (error) {
    console.log(`Attempt ${currentAttempt} failed:`, error.message);
    
    if (currentAttempt < maxRetries) {
      // 准备重试
      return {
        success: false,
        retry: true,
        attempt: currentAttempt + 1,
        error: error.message,
        retryAt: new Date(Date.now() + retryDelay).toISOString()
      };
    } else {
      // 重试次数用完,最终失败
      throw new Error(`Operation failed after ${maxRetries} attempts: ${error.message}`);
    }
  }
}

const result = await performOperation();
return [{ json: result }];

指数退避重试

// Function 节点:指数退避重试策略
const maxRetries = 5;
const baseDelay = 1000; // 基础延迟1秒
const maxDelay = 30000; // 最大延迟30秒
const currentAttempt = $json.attempt || 1;

function calculateDelay(attempt) {
  // 指数退避: 1s, 2s, 4s, 8s, 16s, 30s(max)
  const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay);
  
  // 添加随机抖动,避免同时重试造成雪崩
  const jitter = Math.random() * 0.1 * delay;
  return Math.floor(delay + jitter);
}

async function operationWithRetry() {
  try {
    const result = await executeAPICall($json);
    return {
      success: true,
      data: result,
      totalAttempts: currentAttempt
    };
  } catch (error) {
    const isRetryableError = checkIfRetryable(error);
    
    if (isRetryableError && currentAttempt < maxRetries) {
      const nextDelay = calculateDelay(currentAttempt + 1);
      
      console.log(`Attempt ${currentAttempt} failed, retrying in ${nextDelay}ms`);
      
      return {
        success: false,
        retry: true,
        attempt: currentAttempt + 1,
        error: error.message,
        nextRetryDelay: nextDelay,
        retryAt: new Date(Date.now() + nextDelay).toISOString()
      };
    } else {
      // 不可重试的错误或重试次数用完
      throw new Error(`${isRetryableError ? 'Max retries exceeded' : 'Non-retryable error'}: ${error.message}`);
    }
  }
}

function checkIfRetryable(error) {
  // 定义可重试的错误类型
  const retryableErrors = [
    'timeout',
    'ECONNRESET',
    'ENOTFOUND', 
    'socket hang up',
    'network'
  ];
  
  const retryableHttpCodes = [429, 500, 502, 503, 504];
  
  // 检查错误消息
  const messageRetryable = retryableErrors.some(keyword => 
    error.message.toLowerCase().includes(keyword)
  );
  
  // 检查HTTP状态码
  const httpCodeRetryable = error.response && 
    retryableHttpCodes.includes(error.response.status);
  
  return messageRetryable || httpCodeRetryable;
}

const result = await operationWithRetry();
return [{ json: result }];

条件重试工作流

// 使用 Wait 节点实现延时重试
// 1. HTTP Request 节点 (Continue On Fail: true)
// 2. IF 节点检查是否需要重试
{{ $json.error && $json.attempt < 3 }}

// True 分支:准备重试
// 3. Set 节点:增加重试计数
{
  "attempt": "{{ ($json.attempt || 0) + 1 }}",
  "originalData": "{{ $json.originalData || $json }}",
  "lastError": "{{ $json.error.message }}",
  "retryReason": "{{ $json.error.httpCode || 'Unknown error' }}"
}

// 4. Wait 节点:延时
Amount: "{{ Math.pow(2, $json.attempt) }}"  // 指数退避
Unit: "seconds"

// 5. 连接回到 HTTP Request 节点形成循环

// False 分支:处理最终结果或错误

🚨 错误监控和告警

错误日志记录

// Function 节点:结构化错误日志
class ErrorLogger {
  constructor(workflowId, executionId) {
    this.workflowId = workflowId;
    this.executionId = executionId;
    this.context = {
      timestamp: new Date().toISOString(),
      environment: $env.NODE_ENV || 'unknown',
      version: $env.WORKFLOW_VERSION || '1.0.0'
    };
  }
  
  log(level, message, error = null, additionalData = {}) {
    const logEntry = {
      level: level,
      message: message,
      workflowId: this.workflowId,
      executionId: this.executionId,
      ...this.context,
      ...additionalData
    };
    
    if (error) {
      logEntry.error = {
        name: error.name,
        message: error.message,
        stack: error.stack,
        code: error.code,
        httpStatus: error.response?.status
      };
    }
    
    // 输出到控制台
    console.log(JSON.stringify(logEntry));
    
    // 可选:发送到日志服务
    this.sendToLogService(logEntry);
  }
  
  async sendToLogService(logEntry) {
    try {
      // 发送到外部日志服务 (如 ELK, Splunk, 或自建服务)
      if ($env.LOG_SERVICE_URL) {
        await fetch($env.LOG_SERVICE_URL, {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify(logEntry)
        });
      }
    } catch (e) {
      console.error('Failed to send log to service:', e.message);
    }
  }
  
  error(message, error, data = {}) {
    this.log('ERROR', message, error, data);
  }
  
  warn(message, data = {}) {
    this.log('WARN', message, null, data);
  }
  
  info(message, data = {}) {
    this.log('INFO', message, null, data);
  }
}

// 使用示例
const logger = new ErrorLogger($workflow.id, $execution.id);

try {
  const result = await riskyOperation($json);
  logger.info('Operation completed successfully', { 
    operation: 'user_creation',
    userId: result.id 
  });
} catch (error) {
  logger.error('Operation failed', error, {
    operation: 'user_creation',
    inputData: $json,
    retryCount: $json.retryCount || 0
  });
  
  // 重新抛出错误或返回错误响应
  throw error;
}

实时告警系统

// Function 节点:智能告警系统
class AlertManager {
  constructor() {
    this.alertThresholds = {
      error_rate: 0.1,        // 10%错误率
      response_time: 5000,    // 5秒响应时间
      consecutive_fails: 3    // 连续3次失败
    };
    
    this.alertChannels = [
      { type: 'email', config: { to: $env.ALERT_EMAIL } },
      { type: 'dingtalk', config: { webhook: $env.DINGTALK_WEBHOOK } },
      { type: 'slack', config: { webhook: $env.SLACK_WEBHOOK } }
    ];
  }
  
  async checkAndAlert(workflowStats, currentError = null) {
    const alerts = [];
    
    // 检查错误率
    if (workflowStats.errorRate > this.alertThresholds.error_rate) {
      alerts.push({
        type: 'high_error_rate',
        severity: 'warning',
        message: `工作流错误率过高: ${(workflowStats.errorRate * 100).toFixed(1)}%`,
        data: workflowStats
      });
    }
    
    // 检查响应时间
    if (workflowStats.avgResponseTime > this.alertThresholds.response_time) {
      alerts.push({
        type: 'slow_response',
        severity: 'warning', 
        message: `工作流响应时间过长: ${workflowStats.avgResponseTime}ms`,
        data: workflowStats
      });
    }
    
    // 检查连续失败
    if (workflowStats.consecutiveFails >= this.alertThresholds.consecutive_fails) {
      alerts.push({
        type: 'consecutive_failures',
        severity: 'critical',
        message: `工作流连续失败 ${workflowStats.consecutiveFails} 次`,
        data: workflowStats
      });
    }
    
    // 检查当前错误
    if (currentError && this.isCriticalError(currentError)) {
      alerts.push({
        type: 'critical_error',
        severity: 'critical',
        message: `严重错误: ${currentError.message}`,
        data: { error: currentError, stats: workflowStats }
      });
    }
    
    // 发送告警
    for (const alert of alerts) {
      await this.sendAlert(alert);
    }
    
    return alerts;
  }
  
  isCriticalError(error) {
    const criticalKeywords = [
      'database connection',
      'payment failed',
      'security',
      'authentication failed',
      'data corruption'
    ];
    
    return criticalKeywords.some(keyword => 
      error.message.toLowerCase().includes(keyword)
    );
  }
  
  async sendAlert(alert) {
    const alertMessage = this.formatAlertMessage(alert);
    
    for (const channel of this.alertChannels) {
      try {
        switch (channel.type) {
          case 'email':
            await this.sendEmailAlert(alertMessage, channel.config);
            break;
          case 'dingtalk':
            await this.sendDingTalkAlert(alertMessage, channel.config);
            break;
          case 'slack':
            await this.sendSlackAlert(alertMessage, channel.config);
            break;
        }
      } catch (error) {
        console.error(`Failed to send alert via ${channel.type}:`, error.message);
      }
    }
  }
  
  formatAlertMessage(alert) {
    const severityEmoji = {
      'info': 'ℹ️',
      'warning': '⚠️', 
      'critical': '🚨'
    };
    
    return {
      title: `${severityEmoji[alert.severity]} N8N工作流告警`,
      message: alert.message,
      severity: alert.severity,
      type: alert.type,
      timestamp: new Date().toISOString(),
      workflowId: $workflow.id,
      executionId: $execution.id,
      data: alert.data
    };
  }
  
  async sendDingTalkAlert(alert, config) {
    const message = {
      msgtype: 'markdown',
      markdown: {
        title: alert.title,
        text: `
### ${alert.title}

**告警级别**: ${alert.severity}  
**告警类型**: ${alert.type}  
**告警消息**: ${alert.message}  
**时间**: ${alert.timestamp}  
**工作流ID**: ${alert.workflowId}  
**执行ID**: ${alert.executionId}  

[查看详情](${$env.N8N_BASE_URL}/executions/${alert.executionId})
        `
      }
    };
    
    await fetch(config.webhook, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(message)
    });
  }
  
  async sendEmailAlert(alert, config) {
    // 邮件告警实现
    const emailBody = `
    <h2>${alert.title}</h2>
    <p><strong>告警消息:</strong> ${alert.message}</p>
    <p><strong>严重级别:</strong> ${alert.severity}</p>
    <p><strong>时间:</strong> ${alert.timestamp}</p>
    <p><strong>工作流:</strong> ${alert.workflowId}</p>
    `;
    
    // 使用邮件节点发送...
  }
}

// 使用示例
const alertManager = new AlertManager();

// 获取工作流统计数据
const workflowStats = {
  errorRate: 0.15,           // 15%错误率
  avgResponseTime: 3000,     // 3秒响应时间
  consecutiveFails: 2,       // 连续2次失败
  totalExecutions: 100,
  failedExecutions: 15
};

// 检查并发送告警
const alerts = await alertManager.checkAndAlert(workflowStats, $json.error);

console.log(`发送了 ${alerts.length} 个告警`);

🔁 断路器模式

// Function 节点:断路器实现
class CircuitBreaker {
  constructor(options = {}) {
    this.failureThreshold = options.failureThreshold || 5;
    this.recoveryTimeout = options.recoveryTimeout || 60000; // 1分钟
    this.monitoringPeriod = options.monitoringPeriod || 300000; // 5分钟
    
    // 从静态数据获取状态
    const staticData = $node.getWorkflowStaticData('node');
    this.state = staticData.circuitState || 'CLOSED';
    this.failureCount = staticData.failureCount || 0;
    this.lastFailureTime = staticData.lastFailureTime || 0;
    this.lastSuccessTime = staticData.lastSuccessTime || Date.now();
  }
  
  async execute(operation) {
    // 检查断路器状态
    this.updateState();
    
    switch (this.state) {
      case 'OPEN':
        throw new Error('Circuit breaker is OPEN - service is currently unavailable');
        
      case 'HALF_OPEN':
        try {
          const result = await operation();
          this.onSuccess();
          return result;
        } catch (error) {
          this.onFailure();
          throw error;
        }
        
      case 'CLOSED':
      default:
        try {
          const result = await operation();
          this.onSuccess();
          return result;
        } catch (error) {
          this.onFailure();
          throw error;
        }
    }
  }
  
  updateState() {
    const now = Date.now();
    
    if (this.state === 'OPEN' && 
        now - this.lastFailureTime > this.recoveryTimeout) {
      this.state = 'HALF_OPEN';
      console.log('Circuit breaker moved to HALF_OPEN state');
    }
  }
  
  onSuccess() {
    this.failureCount = 0;
    this.lastSuccessTime = Date.now();
    
    if (this.state === 'HALF_OPEN') {
      this.state = 'CLOSED';
      console.log('Circuit breaker moved to CLOSED state');
    }
    
    this.saveState();
  }
  
  onFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    
    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      console.log(`Circuit breaker moved to OPEN state after ${this.failureCount} failures`);
    }
    
    this.saveState();
  }
  
  saveState() {
    const staticData = $node.getWorkflowStaticData('node');
    staticData.circuitState = this.state;
    staticData.failureCount = this.failureCount;
    staticData.lastFailureTime = this.lastFailureTime;
    staticData.lastSuccessTime = this.lastSuccessTime;
  }
  
  getStatus() {
    return {
      state: this.state,
      failureCount: this.failureCount,
      lastFailureTime: this.lastFailureTime,
      lastSuccessTime: this.lastSuccessTime
    };
  }
}

// 使用断路器保护外部API调用
const circuitBreaker = new CircuitBreaker({
  failureThreshold: 3,
  recoveryTimeout: 30000,  // 30秒
  monitoringPeriod: 120000 // 2分钟
});

async function protectedAPICall() {
  return await fetch($env.EXTERNAL_API_URL, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify($json.requestData)
  }).then(response => {
    if (!response.ok) {
      throw new Error(`API call failed with status ${response.status}`);
    }
    return response.json();
  });
}

try {
  const result = await circuitBreaker.execute(protectedAPICall);
  
  return [{
    json: {
      success: true,
      data: result,
      circuitStatus: circuitBreaker.getStatus()
    }
  }];
} catch (error) {
  return [{
    json: {
      success: false,
      error: error.message,
      circuitStatus: circuitBreaker.getStatus()
    }
  }];
}

🎯 实战案例:订单处理错误处理

业务场景

电商订单处理系统,需要处理支付、库存、物流等多个环节的潜在错误。

完整错误处理方案

// Function 节点:订单处理主流程
class OrderProcessor {
  constructor() {
    this.logger = new ErrorLogger('order-processing', $execution.id);
    this.alertManager = new AlertManager();
    this.maxRetries = 3;
    this.retryDelay = 2000;
  }
  
  async processOrder(orderData) {
    const orderId = orderData.orderId;
    this.logger.info('开始处理订单', { orderId });
    
    try {
      // 1. 验证订单数据
      await this.validateOrder(orderData);
      
      // 2. 检查库存
      await this.checkInventory(orderData);
      
      // 3. 处理支付
      const paymentResult = await this.processPayment(orderData);
      
      // 4. 更新库存
      await this.updateInventory(orderData);
      
      // 5. 创建发货任务
      await this.createShippingTask(orderData, paymentResult);
      
      // 6. 发送确认通知
      await this.sendConfirmation(orderData);
      
      this.logger.info('订单处理完成', { orderId });
      
      return {
        success: true,
        orderId: orderId,
        status: 'completed',
        paymentId: paymentResult.transactionId
      };
      
    } catch (error) {
      return await this.handleOrderError(orderData, error);
    }
  }
  
  async handleOrderError(orderData, error) {
    const orderId = orderData.orderId;
    this.logger.error('订单处理失败', error, { orderId });
    
    // 根据错误类型决定处理策略
    const errorType = this.classifyError(error);
    
    switch (errorType) {
      case 'PAYMENT_FAILED':
        return await this.handlePaymentError(orderData, error);
        
      case 'INVENTORY_INSUFFICIENT':
        return await this.handleInventoryError(orderData, error);
        
      case 'NETWORK_ERROR':
        return await this.handleNetworkError(orderData, error);
        
      case 'SYSTEM_ERROR':
        return await this.handleSystemError(orderData, error);
        
      default:
        return await this.handleUnknownError(orderData, error);
    }
  }
  
  async handlePaymentError(orderData, error) {
    // 支付失败处理
    const orderId = orderData.orderId;
    
    // 1. 更新订单状态
    await this.updateOrderStatus(orderId, 'payment_failed');
    
    // 2. 释放库存
    await this.releaseInventory(orderData);
    
    // 3. 通知客户
    await this.notifyCustomer(orderData, 'payment_failed', {
      message: '支付处理失败,请重新尝试或联系客服',
      retryUrl: `${$env.FRONTEND_URL}/orders/${orderId}/retry`
    });
    
    // 4. 如果是可重试的支付错误,创建重试任务
    if (this.isRetryablePaymentError(error)) {
      await this.schedulePaymentRetry(orderData);
    }
    
    return {
      success: false,
      orderId: orderId,
      status: 'payment_failed',
      error: error.message,
      retryable: this.isRetryablePaymentError(error)
    };
  }
  
  async handleInventoryError(orderData, error) {
    const orderId = orderData.orderId;
    
    // 库存不足处理
    await this.updateOrderStatus(orderId, 'inventory_insufficient');
    
    // 通知客户并提供替代方案
    await this.notifyCustomer(orderData, 'inventory_insufficient', {
      message: '商品库存不足',
      alternatives: await this.findAlternativeProducts(orderData.items)
    });
    
    // 通知采购团队
    await this.notifyPurchasing(orderData);
    
    return {
      success: false,
      orderId: orderId,
      status: 'inventory_insufficient',
      error: error.message,
      retryable: false
    };
  }
  
  async handleNetworkError(orderData, error) {
    const orderId = orderData.orderId;
    const retryCount = orderData.retryCount || 0;
    
    if (retryCount < this.maxRetries) {
      // 网络错误重试
      this.logger.warn('网络错误,准备重试', { 
        orderId, 
        retryCount: retryCount + 1 
      });
      
      const retryDelay = this.calculateRetryDelay(retryCount + 1);
      
      return {
        success: false,
        orderId: orderId,
        status: 'retrying',
        error: error.message,
        retryable: true,
        retryCount: retryCount + 1,
        retryAt: new Date(Date.now() + retryDelay).toISOString()
      };
    } else {
      // 重试次数用完,标记为失败
      await this.updateOrderStatus(orderId, 'network_failed');
      await this.notifyCustomer(orderData, 'processing_failed');
      
      // 发送告警
      await this.alertManager.sendAlert({
        type: 'order_processing_failed',
        severity: 'critical',
        message: `订单 ${orderId} 在网络重试 ${this.maxRetries} 次后仍然失败`,
        data: { orderId, error: error.message }
      });
      
      return {
        success: false,
        orderId: orderId,
        status: 'failed',
        error: `网络错误重试 ${this.maxRetries} 次后仍然失败`,
        retryable: false
      };
    }
  }
  
  async handleSystemError(orderData, error) {
    const orderId = orderData.orderId;
    
    // 系统错误,立即告警并暂停处理
    await this.alertManager.sendAlert({
      type: 'system_error',
      severity: 'critical',
      message: `系统错误导致订单 ${orderId} 处理失败`,
      data: { orderId, error: error.message, stack: error.stack }
    });
    
    // 暂停订单处理
    await this.updateOrderStatus(orderId, 'system_error');
    
    // 人工介入通知
    await this.notifySupport(orderData, error);
    
    return {
      success: false,
      orderId: orderId,
      status: 'system_error',
      error: error.message,
      retryable: false,
      requiresManualIntervention: true
    };
  }
  
  classifyError(error) {
    const message = error.message.toLowerCase();
    
    if (message.includes('payment') || message.includes('transaction')) {
      return 'PAYMENT_FAILED';
    }
    
    if (message.includes('inventory') || message.includes('stock')) {
      return 'INVENTORY_INSUFFICIENT';
    }
    
    if (message.includes('network') || message.includes('timeout') || 
        message.includes('connection')) {
      return 'NETWORK_ERROR';
    }
    
    if (message.includes('database') || message.includes('system') ||
        message.includes('internal')) {
      return 'SYSTEM_ERROR';
    }
    
    return 'UNKNOWN_ERROR';
  }
  
  calculateRetryDelay(retryCount) {
    // 指数退避 + 随机抖动
    const baseDelay = this.retryDelay * Math.pow(2, retryCount - 1);
    const maxDelay = 30000; // 最大30秒
    const jitter = Math.random() * 0.1 * baseDelay;
    
    return Math.min(baseDelay + jitter, maxDelay);
  }
  
  // 其他辅助方法...
  async validateOrder(orderData) {
    if (!orderData.orderId) throw new Error('订单ID缺失');
    if (!orderData.customerId) throw new Error('客户ID缺失');
    if (!orderData.items || orderData.items.length === 0) {
      throw new Error('订单商品列表为空');
    }
    // 更多验证逻辑...
  }
  
  async checkInventory(orderData) {
    // 库存检查逻辑...
  }
  
  async processPayment(orderData) {
    // 支付处理逻辑...
  }
  
  // ... 其他方法实现
}

// 主处理逻辑
const processor = new OrderProcessor();
const result = await processor.processOrder($json);

return [{ json: result }];

📋 错误处理最佳实践

1. 分层错误处理

// 应用层错误处理
try {
  // 业务逻辑
} catch (error) {
  if (error instanceof BusinessError) {
    // 业务错误处理
  } else if (error instanceof ValidationError) {
    // 验证错误处理
  } else {
    // 系统错误处理
  }
}

2. 错误边界设置

// 在关键节点设置错误边界
const criticalOperations = [
  'payment_processing',
  'data_persistence', 
  'external_api_calls'
];

if (criticalOperations.includes($json.operationType)) {
  // 使用更严格的错误处理
  // 不允许继续执行
} else {
  // 允许降级处理
}

3. 用户友好的错误信息

function formatUserError(error) {
  const userFriendlyMessages = {
    'PAYMENT_DECLINED': '您的支付被拒绝,请检查银行卡信息或联系银行',
    'NETWORK_TIMEOUT': '网络连接超时,请稍后重试',
    'INVENTORY_INSUFFICIENT': '商品库存不足,我们将尽快补货',
    'SYSTEM_MAINTENANCE': '系统正在维护中,请稍后再试'
  };
  
  return userFriendlyMessages[error.code] || '操作失败,请联系客服';
}

🚀 下一步学习

掌握错误处理后,继续学习:

  1. API集成 - 处理外部服务集成中的错误
  2. 性能优化 - 优化错误处理的性能影响
  3. 监控分析 - 建立完整的监控体系

完善的错误处理是生产级工作流的基石。通过系统性的错误处理策略,你能构建出健壮、可靠的自动化系统,即使在异常情况下也能优雅地运行!