Logon8n中文教程

循环与迭代

掌握N8N中的循环处理技巧,高效处理批量数据和重复任务

循环与迭代

在处理大量数据或重复任务时,循环和迭代是必不可少的技能。N8N 提供了多种方式来实现循环处理,本章将详细介绍各种循环模式和最佳实践。

🔄 循环处理概念

循环类型分类

循环类型特点适用场景N8N实现方式
数据循环遍历数据集批量处理、数据转换Split In Batches
条件循环基于条件重复轮询、重试机制Loop Over Items
时间循环按时间间隔定期检查、监控Cron + 条件判断
嵌套循环多层循环复杂数据结构处理多个循环节点组合

📊 Split In Batches 节点详解

Split In Batches 是 N8N 中最常用的循环节点,用于将大数据集分批处理。

基本配置

// Split In Batches 节点配置
Batch Size: 10          // 每批处理的数据量
Options: {
  Reset: false,         // 是否重置计数器
  Keep Input Data: true // 保留原始输入数据
}

数据流示例

// 输入数据:100个用户
[
  { id: 1, name: "张三", email: "[email protected]" },
  { id: 2, name: "李四", email: "[email protected]" },
  // ... 98 more users
]

// Split In Batches (Batch Size: 10)
// 第一批:用户 1-10
// 第二批:用户 11-20
// ...
// 第十批:用户 91-100

批处理工作流示例

// 1. 获取用户列表 (HTTP Request)
GET /api/users

// 2. 分批处理 (Split In Batches)
Batch Size: 50

// 3. 批处理逻辑 (Function)
const batch = $input.all();
console.log(`Processing batch of ${batch.length} users`);

const processedUsers = batch.map(item => {
  const user = item.json;
  return {
    json: {
      userId: user.id,
      fullName: `${user.firstName} ${user.lastName}`,
      emailDomain: user.email.split('@')[1],
      processedAt: new Date().toISOString()
    }
  };
});

return processedUsers;

// 4. 保存批次结果 (MySQL)
INSERT INTO processed_users (user_id, full_name, email_domain, processed_at)
VALUES ?

// 5. 等待处理完成 (Wait)
Amount: 1
Unit: seconds

// 6. 检查是否还有更多批次
// Split In Batches 会自动循环直到处理完所有数据

🔁 Loop Over Items 节点

Loop Over Items 适用于需要对每个数据项进行独立处理的场景。

基本用法

// Loop Over Items 配置
// 自动遍历输入的每个数据项

// 示例:为每个用户发送个性化邮件
const user = $json;  // 当前循环的用户数据

const personalizedContent = `
亲爱的 ${user.name},

您的账户状态:${user.status}
上次登录:${user.lastLogin}
积分余额:${user.points}

感谢您的使用!
`;

return [{
  json: {
    email: user.email,
    subject: `${user.name},您的账户摘要`,
    content: personalizedContent
  }
}];

错误处理在循环中

// Function 节点:带错误处理的循环
const user = $json;
const errors = [];
const successes = [];

try {
  // 处理用户数据
  const result = await processUser(user);
  successes.push({
    userId: user.id,
    result: result
  });
} catch (error) {
  errors.push({
    userId: user.id,
    error: error.message,
    timestamp: new Date().toISOString()
  });
  
  // 记录错误但继续处理下一个用户
  console.error(`Failed to process user ${user.id}:`, error.message);
}

// 返回处理结果
return [{
  json: {
    success: errors.length === 0,
    userId: user.id,
    errors: errors,
    successes: successes
  }
}];

🔄 条件循环模式

轮询模式

// 轮询API直到任务完成
// Function 节点:检查任务状态
const taskId = $json.taskId;
const maxAttempts = 30;  // 最大重试次数
const currentAttempt = $json.attempt || 1;

// 检查任务状态
const response = await fetch(`/api/tasks/${taskId}/status`);
const taskStatus = await response.json();

if (taskStatus.status === 'completed') {
  // 任务完成,退出循环
  return [{
    json: {
      taskId: taskId,
      status: 'completed',
      result: taskStatus.result,
      attempts: currentAttempt
    }
  }];
} else if (currentAttempt >= maxAttempts) {
  // 达到最大重试次数
  throw new Error(`Task ${taskId} did not complete within ${maxAttempts} attempts`);
} else {
  // 继续等待,增加重试计数
  return [{
    json: {
      taskId: taskId,
      status: 'pending',
      attempt: currentAttempt + 1,
      nextCheckAt: new Date(Date.now() + 30000).toISOString()  // 30秒后重试
    }
  }];
}

重试机制

// Function 节点:带指数退避的重试
const operation = $json.operation;
const maxRetries = 5;
const currentRetry = $json.retryCount || 0;

try {
  // 执行操作
  const result = await performOperation(operation);
  return [{
    json: {
      success: true,
      result: result,
      retriesUsed: currentRetry
    }
  }];
} catch (error) {
  if (currentRetry < maxRetries) {
    // 计算下次重试的延迟时间(指数退避)
    const delay = Math.pow(2, currentRetry) * 1000;  // 1s, 2s, 4s, 8s, 16s
    
    console.log(`Operation failed, retrying in ${delay}ms (attempt ${currentRetry + 1}/${maxRetries})`);
    
    return [{
      json: {
        ...operation,
        retryCount: currentRetry + 1,
        retryDelay: delay,
        lastError: error.message
      }
    }];
  } else {
    // 重试次数用完,抛出错误
    throw new Error(`Operation failed after ${maxRetries} retries: ${error.message}`);
  }
}

🎯 实战案例:电商库存同步

业务场景

定期同步多个销售渠道的库存数据到中央系统。

工作流设计

定时触发 → 获取渠道列表 → 循环处理渠道 → 获取库存数据 → 批量更新 → 发送报告

实现步骤

1. 获取需要同步的渠道

// HTTP Request: 获取活跃渠道列表
GET /api/channels?status=active

// 返回数据示例
[
  { id: 1, name: "天猫旗舰店", api_url: "https://api.tmall.com", credentials: {...} },
  { id: 2, name: "京东自营", api_url: "https://api.jd.com", credentials: {...} },
  { id: 3, name: "微信小程序", api_url: "https://api.wechat.com", credentials: {...} }
]

2. 循环处理每个渠道

// Loop Over Items: 遍历每个渠道
const channel = $json;
console.log(`Starting inventory sync for channel: ${channel.name}`);

// 获取渠道库存数据
const inventoryResponse = await fetch(`${channel.api_url}/inventory`, {
  headers: {
    'Authorization': `Bearer ${channel.credentials.token}`,
    'Content-Type': 'application/json'
  }
});

const inventoryData = await inventoryResponse.json();

return [{
  json: {
    channelId: channel.id,
    channelName: channel.name,
    inventoryData: inventoryData,
    syncStartTime: new Date().toISOString()
  }
}];

3. 批量处理库存数据

// Split In Batches: 分批处理库存数据
Batch Size: 100

// Function: 批量更新库存
const inventoryBatch = $input.all();
const updates = [];
const errors = [];

for (const item of inventoryBatch) {
  const inventory = item.json;
  
  try {
    // 验证数据格式
    if (!inventory.sku || inventory.quantity < 0) {
      throw new Error(`Invalid inventory data for SKU: ${inventory.sku}`);
    }
    
    updates.push({
      sku: inventory.sku,
      quantity: inventory.quantity,
      price: inventory.price,
      channel_id: inventory.channelId,
      updated_at: new Date().toISOString()
    });
  } catch (error) {
    errors.push({
      sku: inventory.sku || 'unknown',
      error: error.message,
      channelId: inventory.channelId
    });
  }
}

// 批量更新数据库
if (updates.length > 0) {
  const updateQuery = `
    INSERT INTO inventory (sku, quantity, price, channel_id, updated_at)
    VALUES ?
    ON DUPLICATE KEY UPDATE
    quantity = VALUES(quantity),
    price = VALUES(price),
    updated_at = VALUES(updated_at)
  `;
  
  await executeQuery(updateQuery, [updates]);
}

return [{
  json: {
    batchSize: inventoryBatch.length,
    updatesCount: updates.length,
    errorsCount: errors.length,
    errors: errors
  }
}];

4. 生成同步报告

// Aggregate: 汇总所有批次结果
const allBatches = $input.all();
const totalUpdates = allBatches.reduce((sum, batch) => sum + batch.json.updatesCount, 0);
const totalErrors = allBatches.reduce((sum, batch) => sum + batch.json.errorsCount, 0);
const allErrors = allBatches.flatMap(batch => batch.json.errors);

const syncReport = {
  syncTime: new Date().toISOString(),
  channelsProcessed: allBatches.length,
  totalInventoryItems: totalUpdates + totalErrors,
  successfulUpdates: totalUpdates,
  failedUpdates: totalErrors,
  successRate: ((totalUpdates / (totalUpdates + totalErrors)) * 100).toFixed(2) + '%',
  errors: allErrors
};

console.log('Inventory sync completed:', syncReport);

// 发送报告邮件
const emailContent = `
库存同步完成报告

同步时间:${syncReport.syncTime}
处理渠道:${syncReport.channelsProcessed}个
库存数据:${syncReport.totalInventoryItems}条
成功更新:${syncReport.successfulUpdates}条
失败数量:${syncReport.failedUpdates}条
成功率:${syncReport.successRate}

${syncReport.errors.length > 0 ? '错误详情:\n' + syncReport.errors.map(e => `- SKU ${e.sku}: ${e.error}`).join('\n') : ''}
`;

return [{
  json: {
    ...syncReport,
    emailContent: emailContent
  }
}];

🔧 循环优化技巧

1. 内存管理

// 处理大数据集时的内存优化
const BATCH_SIZE = 50;  // 减小批次大小
const PROCESSING_DELAY = 100;  // 添加处理延迟

// Function 节点
const items = $input.all();

// 强制垃圾回收(在支持的环境中)
if (global.gc) {
  global.gc();
}

// 分批处理,避免内存积累
for (let i = 0; i < items.length; i += BATCH_SIZE) {
  const batch = items.slice(i, i + BATCH_SIZE);
  
  // 处理批次
  const processed = await processBatch(batch);
  
  // 添加短暂延迟,让系统有机会清理内存
  await new Promise(resolve => setTimeout(resolve, PROCESSING_DELAY));
}

2. 错误恢复

// Function 节点:断点续传机制
const checkpoint = $node.getWorkflowStaticData('node').checkpoint || 0;
const allItems = $json.items;
const remainingItems = allItems.slice(checkpoint);

console.log(`Resuming from checkpoint: ${checkpoint}/${allItems.length}`);

const results = [];
let currentIndex = checkpoint;

for (const item of remainingItems) {
  try {
    const result = await processItem(item);
    results.push(result);
    currentIndex++;
    
    // 更新检查点
    $node.getWorkflowStaticData('node').checkpoint = currentIndex;
    
    // 每处理10个项目保存一次进度
    if (currentIndex % 10 === 0) {
      console.log(`Progress: ${currentIndex}/${allItems.length}`);
    }
  } catch (error) {
    console.error(`Failed to process item at index ${currentIndex}:`, error);
    
    // 保存当前进度并抛出错误,以便后续恢复
    $node.getWorkflowStaticData('node').checkpoint = currentIndex;
    throw error;
  }
}

// 完成后清除检查点
$node.getWorkflowStaticData('node').checkpoint = 0;

return results;

3. 并发控制

// Function 节点:限制并发数量
const CONCURRENT_LIMIT = 5;
const items = $input.all();

const processWithConcurrencyLimit = async (items, limit) => {
  const results = [];
  
  for (let i = 0; i < items.length; i += limit) {
    const batch = items.slice(i, i + limit);
    
    // 并发处理批次内的项目
    const promises = batch.map(item => processItem(item.json));
    const batchResults = await Promise.allSettled(promises);
    
    // 处理结果
    batchResults.forEach((result, index) => {
      if (result.status === 'fulfilled') {
        results.push({ success: true, data: result.value });
      } else {
        results.push({ 
          success: false, 
          error: result.reason.message,
          item: batch[index].json
        });
      }
    });
    
    console.log(`Completed batch ${Math.floor(i/limit) + 1}/${Math.ceil(items.length/limit)}`);
  }
  
  return results;
};

const results = await processWithConcurrencyLimit(items, CONCURRENT_LIMIT);
return results.map(r => ({ json: r }));

📈 性能监控

循环性能分析

// Function 节点:性能监控
class LoopPerformanceMonitor {
  constructor() {
    this.startTime = Date.now();
    this.itemCount = 0;
    this.errorCount = 0;
    this.processingTimes = [];
  }
  
  startItem() {
    this.itemStartTime = Date.now();
  }
  
  endItem(success = true) {
    const duration = Date.now() - this.itemStartTime;
    this.processingTimes.push(duration);
    this.itemCount++;
    
    if (!success) {
      this.errorCount++;
    }
  }
  
  getStats() {
    const totalTime = Date.now() - this.startTime;
    const avgItemTime = this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length;
    const maxItemTime = Math.max(...this.processingTimes);
    const minItemTime = Math.min(...this.processingTimes);
    
    return {
      totalItems: this.itemCount,
      totalTime: totalTime,
      avgItemTime: avgItemTime,
      maxItemTime: maxItemTime,
      minItemTime: minItemTime,
      itemsPerSecond: (this.itemCount / totalTime * 1000).toFixed(2),
      errorRate: ((this.errorCount / this.itemCount) * 100).toFixed(2) + '%'
    };
  }
}

// 使用示例
const monitor = new LoopPerformanceMonitor();

for (const item of $input.all()) {
  monitor.startItem();
  
  try {
    await processItem(item.json);
    monitor.endItem(true);
  } catch (error) {
    monitor.endItem(false);
    console.error('Item processing failed:', error);
  }
}

const stats = monitor.getStats();
console.log('Loop performance stats:', stats);

🎯 最佳实践

1. 选择合适的循环方式

// 小数据集(< 100项):直接处理
if (items.length < 100) {
  return items.map(processItem);
}

// 中等数据集(100-1000项):Split In Batches
else if (items.length < 1000) {
  // 使用 Split In Batches,批次大小 20-50
}

// 大数据集(> 1000项):分页处理
else {
  // 使用分页API + 状态管理
}

2. 错误处理策略

// 容错处理:记录错误但继续执行
const results = [];
const errors = [];

for (const item of items) {
  try {
    const result = await processItem(item);
    results.push(result);
  } catch (error) {
    errors.push({
      item: item,
      error: error.message,
      timestamp: new Date().toISOString()
    });
    
    // 关键错误:停止处理
    if (error.critical) {
      throw error;
    }
  }
}

3. 进度跟踪

// 实时进度更新
const totalItems = items.length;
let processedItems = 0;

for (const item of items) {
  await processItem(item);
  processedItems++;
  
  // 每10%进度报告一次
  if (processedItems % Math.ceil(totalItems / 10) === 0) {
    const progress = (processedItems / totalItems * 100).toFixed(1);
    console.log(`Progress: ${progress}% (${processedItems}/${totalItems})`);
    
    // 可选:发送进度通知
    await sendProgressUpdate(progress);
  }
}

🚀 下一步学习

掌握循环与迭代后,继续学习:

  1. 数据转换 - 高级数据处理技巧
  2. 错误处理 - 构建稳定的循环流程
  3. 性能优化 - 优化循环性能

循环与迭代是处理大规模数据的核心技能。通过合理选择循环方式,优化处理逻辑,你能构建出高效、稳定的批量处理工作流!