循环与迭代
掌握N8N中的循环处理技巧,高效处理批量数据和重复任务
循环与迭代
在处理大量数据或重复任务时,循环和迭代是必不可少的技能。N8N 提供了多种方式来实现循环处理,本章将详细介绍各种循环模式和最佳实践。
🔄 循环处理概念
循环类型分类
循环类型 | 特点 | 适用场景 | N8N实现方式 |
---|---|---|---|
数据循环 | 遍历数据集 | 批量处理、数据转换 | Split In Batches |
条件循环 | 基于条件重复 | 轮询、重试机制 | Loop Over Items |
时间循环 | 按时间间隔 | 定期检查、监控 | Cron + 条件判断 |
嵌套循环 | 多层循环 | 复杂数据结构处理 | 多个循环节点组合 |
📊 Split In Batches 节点详解
Split In Batches 是 N8N 中最常用的循环节点,用于将大数据集分批处理。
基本配置
// Split In Batches 节点配置
Batch Size: 10 // 每批处理的数据量
Options: {
Reset: false, // 是否重置计数器
Keep Input Data: true // 保留原始输入数据
}
数据流示例
// 输入数据:100个用户
[
{ id: 1, name: "张三", email: "[email protected]" },
{ id: 2, name: "李四", email: "[email protected]" },
// ... 98 more users
]
// Split In Batches (Batch Size: 10)
// 第一批:用户 1-10
// 第二批:用户 11-20
// ...
// 第十批:用户 91-100
批处理工作流示例
// 1. 获取用户列表 (HTTP Request)
GET /api/users
// 2. 分批处理 (Split In Batches)
Batch Size: 50
// 3. 批处理逻辑 (Function)
const batch = $input.all();
console.log(`Processing batch of ${batch.length} users`);
const processedUsers = batch.map(item => {
const user = item.json;
return {
json: {
userId: user.id,
fullName: `${user.firstName} ${user.lastName}`,
emailDomain: user.email.split('@')[1],
processedAt: new Date().toISOString()
}
};
});
return processedUsers;
// 4. 保存批次结果 (MySQL)
INSERT INTO processed_users (user_id, full_name, email_domain, processed_at)
VALUES ?
// 5. 等待处理完成 (Wait)
Amount: 1
Unit: seconds
// 6. 检查是否还有更多批次
// Split In Batches 会自动循环直到处理完所有数据
🔁 Loop Over Items 节点
Loop Over Items 适用于需要对每个数据项进行独立处理的场景。
基本用法
// Loop Over Items 配置
// 自动遍历输入的每个数据项
// 示例:为每个用户发送个性化邮件
const user = $json; // 当前循环的用户数据
const personalizedContent = `
亲爱的 ${user.name},
您的账户状态:${user.status}
上次登录:${user.lastLogin}
积分余额:${user.points}
感谢您的使用!
`;
return [{
json: {
email: user.email,
subject: `${user.name},您的账户摘要`,
content: personalizedContent
}
}];
错误处理在循环中
// Function 节点:带错误处理的循环
const user = $json;
const errors = [];
const successes = [];
try {
// 处理用户数据
const result = await processUser(user);
successes.push({
userId: user.id,
result: result
});
} catch (error) {
errors.push({
userId: user.id,
error: error.message,
timestamp: new Date().toISOString()
});
// 记录错误但继续处理下一个用户
console.error(`Failed to process user ${user.id}:`, error.message);
}
// 返回处理结果
return [{
json: {
success: errors.length === 0,
userId: user.id,
errors: errors,
successes: successes
}
}];
🔄 条件循环模式
轮询模式
// 轮询API直到任务完成
// Function 节点:检查任务状态
const taskId = $json.taskId;
const maxAttempts = 30; // 最大重试次数
const currentAttempt = $json.attempt || 1;
// 检查任务状态
const response = await fetch(`/api/tasks/${taskId}/status`);
const taskStatus = await response.json();
if (taskStatus.status === 'completed') {
// 任务完成,退出循环
return [{
json: {
taskId: taskId,
status: 'completed',
result: taskStatus.result,
attempts: currentAttempt
}
}];
} else if (currentAttempt >= maxAttempts) {
// 达到最大重试次数
throw new Error(`Task ${taskId} did not complete within ${maxAttempts} attempts`);
} else {
// 继续等待,增加重试计数
return [{
json: {
taskId: taskId,
status: 'pending',
attempt: currentAttempt + 1,
nextCheckAt: new Date(Date.now() + 30000).toISOString() // 30秒后重试
}
}];
}
重试机制
// Function 节点:带指数退避的重试
const operation = $json.operation;
const maxRetries = 5;
const currentRetry = $json.retryCount || 0;
try {
// 执行操作
const result = await performOperation(operation);
return [{
json: {
success: true,
result: result,
retriesUsed: currentRetry
}
}];
} catch (error) {
if (currentRetry < maxRetries) {
// 计算下次重试的延迟时间(指数退避)
const delay = Math.pow(2, currentRetry) * 1000; // 1s, 2s, 4s, 8s, 16s
console.log(`Operation failed, retrying in ${delay}ms (attempt ${currentRetry + 1}/${maxRetries})`);
return [{
json: {
...operation,
retryCount: currentRetry + 1,
retryDelay: delay,
lastError: error.message
}
}];
} else {
// 重试次数用完,抛出错误
throw new Error(`Operation failed after ${maxRetries} retries: ${error.message}`);
}
}
🎯 实战案例:电商库存同步
业务场景
定期同步多个销售渠道的库存数据到中央系统。
工作流设计
定时触发 → 获取渠道列表 → 循环处理渠道 → 获取库存数据 → 批量更新 → 发送报告
实现步骤
1. 获取需要同步的渠道
// HTTP Request: 获取活跃渠道列表
GET /api/channels?status=active
// 返回数据示例
[
{ id: 1, name: "天猫旗舰店", api_url: "https://api.tmall.com", credentials: {...} },
{ id: 2, name: "京东自营", api_url: "https://api.jd.com", credentials: {...} },
{ id: 3, name: "微信小程序", api_url: "https://api.wechat.com", credentials: {...} }
]
2. 循环处理每个渠道
// Loop Over Items: 遍历每个渠道
const channel = $json;
console.log(`Starting inventory sync for channel: ${channel.name}`);
// 获取渠道库存数据
const inventoryResponse = await fetch(`${channel.api_url}/inventory`, {
headers: {
'Authorization': `Bearer ${channel.credentials.token}`,
'Content-Type': 'application/json'
}
});
const inventoryData = await inventoryResponse.json();
return [{
json: {
channelId: channel.id,
channelName: channel.name,
inventoryData: inventoryData,
syncStartTime: new Date().toISOString()
}
}];
3. 批量处理库存数据
// Split In Batches: 分批处理库存数据
Batch Size: 100
// Function: 批量更新库存
const inventoryBatch = $input.all();
const updates = [];
const errors = [];
for (const item of inventoryBatch) {
const inventory = item.json;
try {
// 验证数据格式
if (!inventory.sku || inventory.quantity < 0) {
throw new Error(`Invalid inventory data for SKU: ${inventory.sku}`);
}
updates.push({
sku: inventory.sku,
quantity: inventory.quantity,
price: inventory.price,
channel_id: inventory.channelId,
updated_at: new Date().toISOString()
});
} catch (error) {
errors.push({
sku: inventory.sku || 'unknown',
error: error.message,
channelId: inventory.channelId
});
}
}
// 批量更新数据库
if (updates.length > 0) {
const updateQuery = `
INSERT INTO inventory (sku, quantity, price, channel_id, updated_at)
VALUES ?
ON DUPLICATE KEY UPDATE
quantity = VALUES(quantity),
price = VALUES(price),
updated_at = VALUES(updated_at)
`;
await executeQuery(updateQuery, [updates]);
}
return [{
json: {
batchSize: inventoryBatch.length,
updatesCount: updates.length,
errorsCount: errors.length,
errors: errors
}
}];
4. 生成同步报告
// Aggregate: 汇总所有批次结果
const allBatches = $input.all();
const totalUpdates = allBatches.reduce((sum, batch) => sum + batch.json.updatesCount, 0);
const totalErrors = allBatches.reduce((sum, batch) => sum + batch.json.errorsCount, 0);
const allErrors = allBatches.flatMap(batch => batch.json.errors);
const syncReport = {
syncTime: new Date().toISOString(),
channelsProcessed: allBatches.length,
totalInventoryItems: totalUpdates + totalErrors,
successfulUpdates: totalUpdates,
failedUpdates: totalErrors,
successRate: ((totalUpdates / (totalUpdates + totalErrors)) * 100).toFixed(2) + '%',
errors: allErrors
};
console.log('Inventory sync completed:', syncReport);
// 发送报告邮件
const emailContent = `
库存同步完成报告
同步时间:${syncReport.syncTime}
处理渠道:${syncReport.channelsProcessed}个
库存数据:${syncReport.totalInventoryItems}条
成功更新:${syncReport.successfulUpdates}条
失败数量:${syncReport.failedUpdates}条
成功率:${syncReport.successRate}
${syncReport.errors.length > 0 ? '错误详情:\n' + syncReport.errors.map(e => `- SKU ${e.sku}: ${e.error}`).join('\n') : ''}
`;
return [{
json: {
...syncReport,
emailContent: emailContent
}
}];
🔧 循环优化技巧
1. 内存管理
// 处理大数据集时的内存优化
const BATCH_SIZE = 50; // 减小批次大小
const PROCESSING_DELAY = 100; // 添加处理延迟
// Function 节点
const items = $input.all();
// 强制垃圾回收(在支持的环境中)
if (global.gc) {
global.gc();
}
// 分批处理,避免内存积累
for (let i = 0; i < items.length; i += BATCH_SIZE) {
const batch = items.slice(i, i + BATCH_SIZE);
// 处理批次
const processed = await processBatch(batch);
// 添加短暂延迟,让系统有机会清理内存
await new Promise(resolve => setTimeout(resolve, PROCESSING_DELAY));
}
2. 错误恢复
// Function 节点:断点续传机制
const checkpoint = $node.getWorkflowStaticData('node').checkpoint || 0;
const allItems = $json.items;
const remainingItems = allItems.slice(checkpoint);
console.log(`Resuming from checkpoint: ${checkpoint}/${allItems.length}`);
const results = [];
let currentIndex = checkpoint;
for (const item of remainingItems) {
try {
const result = await processItem(item);
results.push(result);
currentIndex++;
// 更新检查点
$node.getWorkflowStaticData('node').checkpoint = currentIndex;
// 每处理10个项目保存一次进度
if (currentIndex % 10 === 0) {
console.log(`Progress: ${currentIndex}/${allItems.length}`);
}
} catch (error) {
console.error(`Failed to process item at index ${currentIndex}:`, error);
// 保存当前进度并抛出错误,以便后续恢复
$node.getWorkflowStaticData('node').checkpoint = currentIndex;
throw error;
}
}
// 完成后清除检查点
$node.getWorkflowStaticData('node').checkpoint = 0;
return results;
3. 并发控制
// Function 节点:限制并发数量
const CONCURRENT_LIMIT = 5;
const items = $input.all();
const processWithConcurrencyLimit = async (items, limit) => {
const results = [];
for (let i = 0; i < items.length; i += limit) {
const batch = items.slice(i, i + limit);
// 并发处理批次内的项目
const promises = batch.map(item => processItem(item.json));
const batchResults = await Promise.allSettled(promises);
// 处理结果
batchResults.forEach((result, index) => {
if (result.status === 'fulfilled') {
results.push({ success: true, data: result.value });
} else {
results.push({
success: false,
error: result.reason.message,
item: batch[index].json
});
}
});
console.log(`Completed batch ${Math.floor(i/limit) + 1}/${Math.ceil(items.length/limit)}`);
}
return results;
};
const results = await processWithConcurrencyLimit(items, CONCURRENT_LIMIT);
return results.map(r => ({ json: r }));
📈 性能监控
循环性能分析
// Function 节点:性能监控
class LoopPerformanceMonitor {
constructor() {
this.startTime = Date.now();
this.itemCount = 0;
this.errorCount = 0;
this.processingTimes = [];
}
startItem() {
this.itemStartTime = Date.now();
}
endItem(success = true) {
const duration = Date.now() - this.itemStartTime;
this.processingTimes.push(duration);
this.itemCount++;
if (!success) {
this.errorCount++;
}
}
getStats() {
const totalTime = Date.now() - this.startTime;
const avgItemTime = this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length;
const maxItemTime = Math.max(...this.processingTimes);
const minItemTime = Math.min(...this.processingTimes);
return {
totalItems: this.itemCount,
totalTime: totalTime,
avgItemTime: avgItemTime,
maxItemTime: maxItemTime,
minItemTime: minItemTime,
itemsPerSecond: (this.itemCount / totalTime * 1000).toFixed(2),
errorRate: ((this.errorCount / this.itemCount) * 100).toFixed(2) + '%'
};
}
}
// 使用示例
const monitor = new LoopPerformanceMonitor();
for (const item of $input.all()) {
monitor.startItem();
try {
await processItem(item.json);
monitor.endItem(true);
} catch (error) {
monitor.endItem(false);
console.error('Item processing failed:', error);
}
}
const stats = monitor.getStats();
console.log('Loop performance stats:', stats);
🎯 最佳实践
1. 选择合适的循环方式
// 小数据集(< 100项):直接处理
if (items.length < 100) {
return items.map(processItem);
}
// 中等数据集(100-1000项):Split In Batches
else if (items.length < 1000) {
// 使用 Split In Batches,批次大小 20-50
}
// 大数据集(> 1000项):分页处理
else {
// 使用分页API + 状态管理
}
2. 错误处理策略
// 容错处理:记录错误但继续执行
const results = [];
const errors = [];
for (const item of items) {
try {
const result = await processItem(item);
results.push(result);
} catch (error) {
errors.push({
item: item,
error: error.message,
timestamp: new Date().toISOString()
});
// 关键错误:停止处理
if (error.critical) {
throw error;
}
}
}
3. 进度跟踪
// 实时进度更新
const totalItems = items.length;
let processedItems = 0;
for (const item of items) {
await processItem(item);
processedItems++;
// 每10%进度报告一次
if (processedItems % Math.ceil(totalItems / 10) === 0) {
const progress = (processedItems / totalItems * 100).toFixed(1);
console.log(`Progress: ${progress}% (${processedItems}/${totalItems})`);
// 可选:发送进度通知
await sendProgressUpdate(progress);
}
}
🚀 下一步学习
掌握循环与迭代后,继续学习:
循环与迭代是处理大规模数据的核心技能。通过合理选择循环方式,优化处理逻辑,你能构建出高效、稳定的批量处理工作流!