Logon8n中文教程

数据流转

深入理解N8N中的数据流转机制,掌握数据在节点间的传递和处理

数据流转

数据流转是 N8N 工作流的核心概念。理解数据如何在节点间传递、转换和处理,是构建高效工作流的关键。

🔄 数据流基础概念

数据项 (Data Items)

N8N 中的数据以 数据项 的形式在节点间传递:

// 单个数据项结构
{
  json: {
    id: 1,
    name: "张三",
    email: "[email protected]"
  },
  binary: {},  // 二进制数据(如文件)
  pairedItem: { item: 0, input: 0 }  // 追踪数据来源
}

数据批次 (Data Batches)

多个数据项组成一个批次:

// 数据批次示例
[
  { json: { id: 1, name: "张三" } },
  { json: { id: 2, name: "李四" } }, 
  { json: { id: 3, name: "王五" } }
]

📊 数据引用语法

基本引用

// 当前数据项
{{ $json.fieldName }}

// 指定节点的数据
{{ $('NodeName').item.json.fieldName }}

// 所有数据项
{{ $('NodeName').all() }}

// 第一个数据项
{{ $('NodeName').first() }}

// 最后一个数据项  
{{ $('NodeName').last() }}

数组和对象操作

// 数组长度
{{ $json.items.length }}

// 数组元素
{{ $json.items[0] }}
{{ $json.items[$json.index] }}

// 对象属性
{{ $json.user.profile.name }}

// 安全访问(避免 undefined 错误)
{{ $json.user?.profile?.name }}

// 动态属性访问
{{ $json[$json.dynamicKey] }}

🔀 数据传递模式

1. 顺序传递(Sequential)

数据按顺序逐个处理:

// 输入:3个用户数据
[
  { json: { id: 1, name: "张三" } },
  { json: { id: 2, name: "李四" } },
  { json: { id: 3, name: "王五" } }
]

// HTTP Request 节点会为每个用户发送单独请求
// 输出:3个API响应
[
  { json: { userId: 1, profile: {...} } },
  { json: { userId: 2, profile: {...} } },
  { json: { userId: 3, profile: {...} } }
]

2. 批量传递(Batch)

所有数据作为一个批次处理:

// Function 节点中处理所有数据
const allUsers = $input.all();
const processed = allUsers.map(item => ({
  json: {
    ...item.json,
    processedAt: new Date().toISOString()
  }
}));
return processed;

3. 合并传递(Merge)

来自多个分支的数据合并:

// Merge 节点配置
Mode: "Merge By Position"
// 或
Mode: "Merge By Key"
Join On: "id"

🔧 数据转换技巧

Set 节点数据设置

// 基本字段设置
{
  "fullName": "{{ $json.firstName }} {{ $json.lastName }}",
  "email": "{{ $json.email.toLowerCase() }}",
  "age": "{{ parseInt($json.age) }}",
  "isAdult": "{{ $json.age >= 18 }}",
  "createdAt": "{{ new Date().toISOString() }}"
}

// 条件字段设置
{
  "status": "{{ $json.score >= 80 ? 'pass' : 'fail' }}",
  "level": "{{ $json.points > 1000 ? 'premium' : 'basic' }}",
  "discount": "{{ $json.isVIP ? 0.2 : 0.1 }}"
}

// 数组和对象处理
{
  "tags": "{{ $json.categories.map(c => c.name) }}",
  "totalAmount": "{{ $json.orders.reduce((sum, order) => sum + order.amount, 0) }}",
  "firstOrder": "{{ $json.orders[0] }}"
}

Function 节点高级处理

// 数据清洗
const items = $input.all();
const cleanedData = items.map(item => {
  const data = item.json;
  
  return {
    json: {
      id: data.id,
      name: data.name?.trim(),
      email: data.email?.toLowerCase(),
      phone: data.phone?.replace(/\D/g, ''),
      age: parseInt(data.age) || 0,
      tags: Array.isArray(data.tags) ? data.tags.filter(Boolean) : []
    }
  };
});

return cleanedData;

Code 节点复杂逻辑

// 数据分组
const groupedData = {};
for (const item of $input.all()) {
  const category = item.json.category;
  if (!groupedData[category]) {
    groupedData[category] = [];
  }
  groupedData[category].push(item.json);
}

// 输出分组结果
for (const [category, items] of Object.entries(groupedData)) {
  $return.push({
    json: {
      category: category,
      items: items,
      count: items.length,
      totalValue: items.reduce((sum, item) => sum + item.value, 0)
    }
  });
}

📋 实战案例:电商订单数据处理

场景描述

处理电商平台的订单数据,包括数据清洗、计算、分类和通知。

工作流设计

定时触发 → 获取订单 → 数据清洗 → 计算统计 → 分类处理 → 发送通知

1. 获取原始订单数据

// HTTP Request 获取订单
GET /api/orders?date=today

// 返回数据结构
[
  {
    id: "ORD001",
    customer: { name: "张三", email: "[email protected]" },
    items: [
      { product: "iPhone", price: 8999, quantity: 1 },
      { product: "Case", price: 199, quantity: 2 }
    ],
    status: "pending",
    createdAt: "2024-01-15T10:30:00Z"
  }
]

2. 数据清洗和计算

// Function 节点:订单数据处理
const orders = $input.all();
const processedOrders = orders.map(item => {
  const order = item.json;
  
  // 计算订单总额
  const totalAmount = order.items.reduce((sum, item) => 
    sum + (item.price * item.quantity), 0
  );
  
  // 计算商品总数
  const totalQuantity = order.items.reduce((sum, item) => 
    sum + item.quantity, 0
  );
  
  // 订单分类
  let category = 'normal';
  if (totalAmount >= 10000) category = 'high-value';
  else if (totalAmount >= 5000) category = 'medium-value';
  else if (totalAmount < 100) category = 'low-value';
  
  return {
    json: {
      orderId: order.id,
      customerName: order.customer.name,
      customerEmail: order.customer.email,
      totalAmount: totalAmount,
      totalQuantity: totalQuantity,
      category: category,
      status: order.status,
      orderDate: order.createdAt,
      // 保留原始数据
      originalData: order
    }
  };
});

return processedOrders;

3. 数据分流处理

// Switch 节点:按订单类别分流
switch ($json.category) {
  case 'high-value':
    return 0;  // 高价值订单处理分支
  case 'medium-value':
    return 1;  // 中等价值订单分支
  case 'low-value':
    return 2;  // 低价值订单分支
  default:
    return 3;  // 普通订单分支
}

4. 高价值订单特殊处理

// Set 节点:为高价值订单添加特殊标记
{
  "orderId": "{{ $json.orderId }}",
  "customerName": "{{ $json.customerName }}",
  "totalAmount": "{{ $json.totalAmount }}",
  "priority": "high",
  "assignedTo": "vip-team",
  "specialInstructions": "VIP客户订单,优先处理",
  "estimatedProcessingTime": "2小时",
  "notificationRequired": true
}

5. 数据聚合统计

// Aggregate 节点:生成日报统计
{
  "keys": ["category"],
  "values": {
    "orderCount": { "field": "orderId", "operation": "count" },
    "totalRevenue": { "field": "totalAmount", "operation": "sum" },
    "avgOrderValue": { "field": "totalAmount", "operation": "average" },
    "maxOrderValue": { "field": "totalAmount", "operation": "max" }
  }
}

🔍 数据流调试技巧

1. 使用 Set 节点检查数据

// 在关键节点后添加 Set 节点查看数据
{
  "debug_step": "after_data_processing",
  "debug_timestamp": "{{ new Date().toISOString() }}",
  "debug_data_type": "{{ typeof $json }}",
  "debug_data_keys": "{{ Object.keys($json) }}",
  "debug_data_sample": "{{ JSON.stringify($json).substring(0, 200) }}",
  "original_data": "{{ $json }}"
}

2. Function 节点日志输出

// 详细日志
const items = $input.all();
console.log(`Processing ${items.length} items`);

items.forEach((item, index) => {
  console.log(`Item ${index}:`, JSON.stringify(item.json, null, 2));
});

// 数据验证日志
const validItems = items.filter(item => {
  const isValid = item.json.id && item.json.email;
  if (!isValid) {
    console.log('Invalid item found:', item.json);
  }
  return isValid;
});

console.log(`Valid items: ${validItems.length}/${items.length}`);

3. 错误处理和数据恢复

// 在 Function 节点中添加错误处理
try {
  const processedData = items.map(processItem);
  return processedData;
} catch (error) {
  console.error('Processing error:', error);
  
  // 返回原始数据和错误信息
  return items.map(item => ({
    json: {
      ...item.json,
      processingError: error.message,
      processedAt: new Date().toISOString()
    }
  }));
}

🎯 数据流最佳实践

1. 数据验证

// 在处理前验证数据完整性
const requiredFields = ['id', 'email', 'name'];
const invalidItems = items.filter(item => 
  requiredFields.some(field => !item.json[field])
);

if (invalidItems.length > 0) {
  console.warn(`Found ${invalidItems.length} invalid items`);
  // 处理无效数据...
}

2. 性能优化

// 避免在循环中进行复杂计算
const expensiveData = computeOnce();  // 只计算一次

const results = items.map(item => ({
  json: {
    ...item.json,
    computed: expensiveData[item.json.id]  // 使用预计算结果
  }
}));

3. 内存管理

// 处理大数据集时分批处理
const batchSize = 100;
const batches = [];

for (let i = 0; i < items.length; i += batchSize) {
  const batch = items.slice(i, i + batchSize);
  const processed = processBatch(batch);
  batches.push(...processed);
}

return batches;

🚀 下一步学习

掌握数据流转后,你可以继续学习:

  1. 触发器与动作 - 了解工作流的启动机制
  2. 调试测试 - 学会调试工作流
  3. 数据转换 - 高级数据处理技巧

数据流转是 N8N 的生命线。掌握数据在节点间的流动规律,你就能构建出高效、稳定的自动化工作流!