数据流转
深入理解N8N中的数据流转机制,掌握数据在节点间的传递和处理
数据流转
数据流转是 N8N 工作流的核心概念。理解数据如何在节点间传递、转换和处理,是构建高效工作流的关键。
🔄 数据流基础概念
数据项 (Data Items)
N8N 中的数据以 数据项 的形式在节点间传递:
// 单个数据项结构
{
json: {
id: 1,
name: "张三",
email: "[email protected]"
},
binary: {}, // 二进制数据(如文件)
pairedItem: { item: 0, input: 0 } // 追踪数据来源
}
数据批次 (Data Batches)
多个数据项组成一个批次:
// 数据批次示例
[
{ json: { id: 1, name: "张三" } },
{ json: { id: 2, name: "李四" } },
{ json: { id: 3, name: "王五" } }
]
📊 数据引用语法
基本引用
// 当前数据项
{{ $json.fieldName }}
// 指定节点的数据
{{ $('NodeName').item.json.fieldName }}
// 所有数据项
{{ $('NodeName').all() }}
// 第一个数据项
{{ $('NodeName').first() }}
// 最后一个数据项
{{ $('NodeName').last() }}
数组和对象操作
// 数组长度
{{ $json.items.length }}
// 数组元素
{{ $json.items[0] }}
{{ $json.items[$json.index] }}
// 对象属性
{{ $json.user.profile.name }}
// 安全访问(避免 undefined 错误)
{{ $json.user?.profile?.name }}
// 动态属性访问
{{ $json[$json.dynamicKey] }}
🔀 数据传递模式
1. 顺序传递(Sequential)
数据按顺序逐个处理:
// 输入:3个用户数据
[
{ json: { id: 1, name: "张三" } },
{ json: { id: 2, name: "李四" } },
{ json: { id: 3, name: "王五" } }
]
// HTTP Request 节点会为每个用户发送单独请求
// 输出:3个API响应
[
{ json: { userId: 1, profile: {...} } },
{ json: { userId: 2, profile: {...} } },
{ json: { userId: 3, profile: {...} } }
]
2. 批量传递(Batch)
所有数据作为一个批次处理:
// Function 节点中处理所有数据
const allUsers = $input.all();
const processed = allUsers.map(item => ({
json: {
...item.json,
processedAt: new Date().toISOString()
}
}));
return processed;
3. 合并传递(Merge)
来自多个分支的数据合并:
// Merge 节点配置
Mode: "Merge By Position"
// 或
Mode: "Merge By Key"
Join On: "id"
🔧 数据转换技巧
Set 节点数据设置
// 基本字段设置
{
"fullName": "{{ $json.firstName }} {{ $json.lastName }}",
"email": "{{ $json.email.toLowerCase() }}",
"age": "{{ parseInt($json.age) }}",
"isAdult": "{{ $json.age >= 18 }}",
"createdAt": "{{ new Date().toISOString() }}"
}
// 条件字段设置
{
"status": "{{ $json.score >= 80 ? 'pass' : 'fail' }}",
"level": "{{ $json.points > 1000 ? 'premium' : 'basic' }}",
"discount": "{{ $json.isVIP ? 0.2 : 0.1 }}"
}
// 数组和对象处理
{
"tags": "{{ $json.categories.map(c => c.name) }}",
"totalAmount": "{{ $json.orders.reduce((sum, order) => sum + order.amount, 0) }}",
"firstOrder": "{{ $json.orders[0] }}"
}
Function 节点高级处理
// 数据清洗
const items = $input.all();
const cleanedData = items.map(item => {
const data = item.json;
return {
json: {
id: data.id,
name: data.name?.trim(),
email: data.email?.toLowerCase(),
phone: data.phone?.replace(/\D/g, ''),
age: parseInt(data.age) || 0,
tags: Array.isArray(data.tags) ? data.tags.filter(Boolean) : []
}
};
});
return cleanedData;
Code 节点复杂逻辑
// 数据分组
const groupedData = {};
for (const item of $input.all()) {
const category = item.json.category;
if (!groupedData[category]) {
groupedData[category] = [];
}
groupedData[category].push(item.json);
}
// 输出分组结果
for (const [category, items] of Object.entries(groupedData)) {
$return.push({
json: {
category: category,
items: items,
count: items.length,
totalValue: items.reduce((sum, item) => sum + item.value, 0)
}
});
}
📋 实战案例:电商订单数据处理
场景描述
处理电商平台的订单数据,包括数据清洗、计算、分类和通知。
工作流设计
定时触发 → 获取订单 → 数据清洗 → 计算统计 → 分类处理 → 发送通知
1. 获取原始订单数据
// HTTP Request 获取订单
GET /api/orders?date=today
// 返回数据结构
[
{
id: "ORD001",
customer: { name: "张三", email: "[email protected]" },
items: [
{ product: "iPhone", price: 8999, quantity: 1 },
{ product: "Case", price: 199, quantity: 2 }
],
status: "pending",
createdAt: "2024-01-15T10:30:00Z"
}
]
2. 数据清洗和计算
// Function 节点:订单数据处理
const orders = $input.all();
const processedOrders = orders.map(item => {
const order = item.json;
// 计算订单总额
const totalAmount = order.items.reduce((sum, item) =>
sum + (item.price * item.quantity), 0
);
// 计算商品总数
const totalQuantity = order.items.reduce((sum, item) =>
sum + item.quantity, 0
);
// 订单分类
let category = 'normal';
if (totalAmount >= 10000) category = 'high-value';
else if (totalAmount >= 5000) category = 'medium-value';
else if (totalAmount < 100) category = 'low-value';
return {
json: {
orderId: order.id,
customerName: order.customer.name,
customerEmail: order.customer.email,
totalAmount: totalAmount,
totalQuantity: totalQuantity,
category: category,
status: order.status,
orderDate: order.createdAt,
// 保留原始数据
originalData: order
}
};
});
return processedOrders;
3. 数据分流处理
// Switch 节点:按订单类别分流
switch ($json.category) {
case 'high-value':
return 0; // 高价值订单处理分支
case 'medium-value':
return 1; // 中等价值订单分支
case 'low-value':
return 2; // 低价值订单分支
default:
return 3; // 普通订单分支
}
4. 高价值订单特殊处理
// Set 节点:为高价值订单添加特殊标记
{
"orderId": "{{ $json.orderId }}",
"customerName": "{{ $json.customerName }}",
"totalAmount": "{{ $json.totalAmount }}",
"priority": "high",
"assignedTo": "vip-team",
"specialInstructions": "VIP客户订单,优先处理",
"estimatedProcessingTime": "2小时",
"notificationRequired": true
}
5. 数据聚合统计
// Aggregate 节点:生成日报统计
{
"keys": ["category"],
"values": {
"orderCount": { "field": "orderId", "operation": "count" },
"totalRevenue": { "field": "totalAmount", "operation": "sum" },
"avgOrderValue": { "field": "totalAmount", "operation": "average" },
"maxOrderValue": { "field": "totalAmount", "operation": "max" }
}
}
🔍 数据流调试技巧
1. 使用 Set 节点检查数据
// 在关键节点后添加 Set 节点查看数据
{
"debug_step": "after_data_processing",
"debug_timestamp": "{{ new Date().toISOString() }}",
"debug_data_type": "{{ typeof $json }}",
"debug_data_keys": "{{ Object.keys($json) }}",
"debug_data_sample": "{{ JSON.stringify($json).substring(0, 200) }}",
"original_data": "{{ $json }}"
}
2. Function 节点日志输出
// 详细日志
const items = $input.all();
console.log(`Processing ${items.length} items`);
items.forEach((item, index) => {
console.log(`Item ${index}:`, JSON.stringify(item.json, null, 2));
});
// 数据验证日志
const validItems = items.filter(item => {
const isValid = item.json.id && item.json.email;
if (!isValid) {
console.log('Invalid item found:', item.json);
}
return isValid;
});
console.log(`Valid items: ${validItems.length}/${items.length}`);
3. 错误处理和数据恢复
// 在 Function 节点中添加错误处理
try {
const processedData = items.map(processItem);
return processedData;
} catch (error) {
console.error('Processing error:', error);
// 返回原始数据和错误信息
return items.map(item => ({
json: {
...item.json,
processingError: error.message,
processedAt: new Date().toISOString()
}
}));
}
🎯 数据流最佳实践
1. 数据验证
// 在处理前验证数据完整性
const requiredFields = ['id', 'email', 'name'];
const invalidItems = items.filter(item =>
requiredFields.some(field => !item.json[field])
);
if (invalidItems.length > 0) {
console.warn(`Found ${invalidItems.length} invalid items`);
// 处理无效数据...
}
2. 性能优化
// 避免在循环中进行复杂计算
const expensiveData = computeOnce(); // 只计算一次
const results = items.map(item => ({
json: {
...item.json,
computed: expensiveData[item.json.id] // 使用预计算结果
}
}));
3. 内存管理
// 处理大数据集时分批处理
const batchSize = 100;
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const processed = processBatch(batch);
batches.push(...processed);
}
return batches;
🚀 下一步学习
掌握数据流转后,你可以继续学习:
数据流转是 N8N 的生命线。掌握数据在节点间的流动规律,你就能构建出高效、稳定的自动化工作流!