Logon8n中文教程

Webhook与API集成

掌握N8N中的Webhook和API集成技巧,实现与外部系统的无缝连接

Webhook与API集成

Webhook 和 API 集成是 N8N 连接外部世界的核心技术。通过这些技术,你可以让 N8N 接收外部事件、调用外部服务,构建真正的系统集成解决方案。

🔗 Webhook 基础

Webhook 概念

Webhook 是一种"反向 API",允许外部系统在特定事件发生时主动推送数据到你的 N8N 工作流。

// Webhook 工作原理
外部系统事件 → HTTP POST 请求 → N8N Webhook 节点 → 触发工作流执行

Webhook 节点配置

// 基本配置
HTTP Method: POST, GET, PUT, DELETE
Path: /webhook/order-created  // 自定义路径
Authentication: None, Basic Auth, Header Auth
Response Mode: "On Received Call", "Last Node"

// 生成的 URL 格式
https://your-n8n.com/webhook/order-created
https://your-n8n.com/webhook-test/order-created  // 测试 URL

📥 Webhook 接收数据

处理不同格式的数据

// JSON 数据处理
// Webhook 接收到的数据在 $json 中
{
  "event": "order.created",
  "orderId": "ORD-12345",
  "customer": {
    "id": "CUST-67890",
    "name": "张三",
    "email": "[email protected]"
  },
  "items": [
    {
      "productId": "PROD-001",
      "quantity": 2,
      "price": 299.99
    }
  ],
  "total": 599.98,
  "timestamp": "2024-01-15T10:30:00Z"
}

// 在后续节点中访问数据
{{ $json.orderId }}           // ORD-12345
{{ $json.customer.name }}     // 张三
{{ $json.items[0].productId }} // PROD-001
{{ $json.total }}             // 599.98

表单数据处理

// Form URL Encoded 数据
// 来自 HTML 表单的数据
name=张三&email=[email protected]&message=产品咨询

// 在 N8N 中访问
{{ $json.name }}      // 张三
{{ $json.email }}     // [email protected]
{{ $json.message }}   // 产品咨询

文件上传处理

// Webhook 配置
Options: {
  "binaryData": true  // 启用二进制数据处理
}

// 处理上传的文件
// Function 节点
const fileData = $input.first().binary;

if (fileData && fileData.data) {
  const fileName = fileData.fileName || 'uploaded_file';
  const mimeType = fileData.mimeType;
  const fileSize = fileData.data.length;
  
  console.log(`接收到文件: ${fileName}, 类型: ${mimeType}, 大小: ${fileSize} bytes`);
  
  // 保存文件到本地或云存储
  const fs = require('fs');
  const path = `/uploads/${fileName}`;
  
  // 如果是 base64 编码的数据
  const buffer = Buffer.from(fileData.data, 'base64');
  fs.writeFileSync(path, buffer);
  
  return [{
    json: {
      success: true,
      fileName: fileName,
      filePath: path,
      fileSize: fileSize,
      uploadedAt: new Date().toISOString()
    }
  }];
}

🔐 Webhook 安全

签名验证

// Function 节点:验证 Webhook 签名
const crypto = require('crypto');

function verifyWebhookSignature(payload, signature, secret) {
  // GitHub 风格的签名验证
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(payload)
    .digest('hex');
  
  const providedSignature = signature.replace('sha256=', '');
  
  return crypto.timingSafeEqual(
    Buffer.from(expectedSignature, 'hex'),
    Buffer.from(providedSignature, 'hex')
  );
}

// 获取请求数据
const payload = JSON.stringify($json);
const signature = $request.headers['x-hub-signature-256'];
const secret = $env.WEBHOOK_SECRET;

// 验证签名
if (!signature) {
  throw new Error('缺少签名头');
}

if (!verifyWebhookSignature(payload, signature, secret)) {
  throw new Error('签名验证失败');
}

console.log('Webhook 签名验证成功');
return [{ json: { verified: true, ...$json } }];

IP 白名单验证

// Function 节点:IP 地址验证
const allowedIPs = [
  '192.168.1.0/24',    // 内网段
  '203.0.113.1',       // 特定IP
  '140.82.112.0/20'    // GitHub IP段
];

function isIPAllowed(clientIP, allowedList) {
  const { parse } = require('ipaddr.js');
  
  try {
    const client = parse(clientIP);
    
    for (const allowed of allowedList) {
      if (allowed.includes('/')) {
        // CIDR 格式
        const [ip, prefix] = allowed.split('/');
        const network = parse(ip);
        if (client.match(network, parseInt(prefix))) {
          return true;
        }
      } else {
        // 单个 IP
        if (client.toString() === allowed) {
          return true;
        }
      }
    }
    
    return false;
  } catch (error) {
    console.error('IP 解析错误:', error);
    return false;
  }
}

// 获取客户端 IP
const clientIP = $request.headers['x-forwarded-for'] || 
                 $request.headers['x-real-ip'] || 
                 $request.connection?.remoteAddress;

if (!isIPAllowed(clientIP, allowedIPs)) {
  throw new Error(`IP 地址 ${clientIP} 不在白名单中`);
}

console.log(`IP 验证通过: ${clientIP}`);
return [{ json: { ...$json, clientIP } }];

API Key 认证

// Webhook 节点配置
Authentication: "Header Auth"
Header Name: "X-API-Key"
Header Value: "{{ $env.WEBHOOK_API_KEY }}"

// 或者在 Function 节点中验证
const providedKey = $request.headers['x-api-key'];
const validKey = $env.WEBHOOK_API_KEY;

if (!providedKey || providedKey !== validKey) {
  throw new Error('无效的 API Key');
}

🌐 HTTP Request API 调用

基础 API 调用

// HTTP Request 节点配置
Method: POST
URL: https://api.example.com/users
Headers: {
  "Content-Type": "application/json",
  "Authorization": "Bearer {{ $env.API_TOKEN }}"
}
Body: {
  "name": "{{ $json.name }}",
  "email": "{{ $json.email }}",
  "metadata": {
    "source": "n8n",
    "timestamp": "{{ new Date().toISOString() }}"
  }
}

动态 URL 构建

// 使用表达式构建动态 URL
URL: "https://api.example.com/users/{{ $json.userId }}/orders"

// 或者使用 Function 节点构建复杂 URL
const baseUrl = 'https://api.example.com';
const endpoint = '/search';
const params = new URLSearchParams({
  q: $json.searchTerm,
  page: $json.page || 1,
  limit: $json.limit || 20,
  sort: $json.sortBy || 'created_at',
  order: $json.sortOrder || 'desc'
});

const fullUrl = `${baseUrl}${endpoint}?${params.toString()}`;

return [{
  json: {
    url: fullUrl,
    originalData: $json
  }
}];

请求头管理

// Function 节点:动态请求头
const headers = {
  'Content-Type': 'application/json',
  'User-Agent': 'N8N-Automation/1.0',
  'X-Request-ID': crypto.randomUUID(),
  'X-Timestamp': new Date().toISOString()
};

// 根据环境添加认证头
if ($env.NODE_ENV === 'production') {
  headers['Authorization'] = `Bearer ${$env.PROD_API_TOKEN}`;
} else {
  headers['Authorization'] = `Bearer ${$env.DEV_API_TOKEN}`;
}

// 根据 API 版本添加版本头
if ($json.apiVersion) {
  headers['API-Version'] = $json.apiVersion;
}

// 添加自定义业务头
if ($json.tenantId) {
  headers['X-Tenant-ID'] = $json.tenantId;
}

return [{
  json: {
    headers: headers,
    requestData: $json
  }
}];

🔄 API 响应处理

响应状态处理

// Function 节点:处理不同的响应状态
const response = $input.first().json;
const statusCode = response.statusCode || $json.statusCode;

switch (Math.floor(statusCode / 100)) {
  case 2: // 2xx 成功
    return handleSuccessResponse(response);
    
  case 3: // 3xx 重定向
    return handleRedirectResponse(response);
    
  case 4: // 4xx 客户端错误
    return handleClientError(response);
    
  case 5: // 5xx 服务器错误
    return handleServerError(response);
    
  default:
    throw new Error(`未知的状态码: ${statusCode}`);
}

function handleSuccessResponse(response) {
  console.log('API 调用成功');
  return [{
    json: {
      success: true,
      data: response.body || response,
      statusCode: response.statusCode
    }
  }];
}

function handleClientError(response) {
  const statusCode = response.statusCode;
  
  switch (statusCode) {
    case 400:
      throw new Error(`请求参数错误: ${response.body?.message || '未知错误'}`);
    case 401:
      throw new Error('认证失败,请检查 API Token');
    case 403:
      throw new Error('权限不足,无法访问该资源');
    case 404:
      throw new Error('请求的资源不存在');
    case 429:
      throw new Error('请求频率超限,请稍后重试');
    default:
      throw new Error(`客户端错误 ${statusCode}: ${response.body?.message || '未知错误'}`);
  }
}

function handleServerError(response) {
  const statusCode = response.statusCode;
  const errorMessage = `服务器错误 ${statusCode}: ${response.body?.message || '服务器内部错误'}`;
  
  // 5xx 错误通常可以重试
  throw new Error(errorMessage);
}

分页数据处理

// Function 节点:处理分页 API 响应
const allData = [];
let currentPage = 1;
const maxPages = 10; // 防止无限循环

async function fetchAllPages() {
  while (currentPage <= maxPages) {
    console.log(`获取第 ${currentPage} 页数据`);
    
    const response = await fetch(`${$json.apiUrl}?page=${currentPage}&limit=100`, {
      headers: {
        'Authorization': `Bearer ${$env.API_TOKEN}`
      }
    });
    
    const data = await response.json();
    
    if (!data.items || data.items.length === 0) {
      console.log('没有更多数据,停止分页');
      break;
    }
    
    allData.push(...data.items);
    
    // 检查是否还有下一页
    if (!data.hasNextPage || currentPage >= data.totalPages) {
      console.log('已获取所有页面数据');
      break;
    }
    
    currentPage++;
    
    // 添加延迟避免 API 限流
    await new Promise(resolve => setTimeout(resolve, 100));
  }
  
  return allData;
}

const allItems = await fetchAllPages();

console.log(`总共获取 ${allItems.length} 条数据`);

return [{
  json: {
    totalItems: allItems.length,
    items: allItems,
    pagesProcessed: currentPage - 1
  }
}];

🔧 高级集成技巧

OAuth 2.0 认证流程

// Function 节点:OAuth 2.0 Token 管理
class OAuth2TokenManager {
  constructor() {
    this.clientId = $env.OAUTH_CLIENT_ID;
    this.clientSecret = $env.OAUTH_CLIENT_SECRET;
    this.tokenUrl = $env.OAUTH_TOKEN_URL;
    this.staticData = $node.getWorkflowStaticData('global');
  }
  
  async getValidToken() {
    const now = Date.now();
    const token = this.staticData.accessToken;
    const expiresAt = this.staticData.tokenExpiresAt || 0;
    
    // 检查 token 是否过期(提前 5 分钟刷新)
    if (!token || now > (expiresAt - 300000)) {
      console.log('Token 已过期或即将过期,刷新 token');
      return await this.refreshToken();
    }
    
    return token;
  }
  
  async refreshToken() {
    const refreshToken = this.staticData.refreshToken;
    
    if (!refreshToken) {
      throw new Error('缺少 refresh token,需要重新授权');
    }
    
    try {
      const response = await fetch(this.tokenUrl, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/x-www-form-urlencoded',
          'Authorization': `Basic ${Buffer.from(`${this.clientId}:${this.clientSecret}`).toString('base64')}`
        },
        body: new URLSearchParams({
          grant_type: 'refresh_token',
          refresh_token: refreshToken
        })
      });
      
      const tokenData = await response.json();
      
      if (!response.ok) {
        throw new Error(`Token 刷新失败: ${tokenData.error_description || tokenData.error}`);
      }
      
      // 保存新的 token
      this.staticData.accessToken = tokenData.access_token;
      this.staticData.tokenExpiresAt = Date.now() + (tokenData.expires_in * 1000);
      
      if (tokenData.refresh_token) {
        this.staticData.refreshToken = tokenData.refresh_token;
      }
      
      console.log('Token 刷新成功');
      return tokenData.access_token;
      
    } catch (error) {
      console.error('Token 刷新失败:', error);
      throw error;
    }
  }
}

// 使用 Token Manager
const tokenManager = new OAuth2TokenManager();
const accessToken = await tokenManager.getValidToken();

return [{
  json: {
    accessToken: accessToken,
    tokenValid: true
  }
}];

API 限流处理

// Function 节点:智能限流处理
class RateLimitHandler {
  constructor() {
    this.staticData = $node.getWorkflowStaticData('node');
    this.requests = this.staticData.requests || [];
    this.maxRequestsPerMinute = 60;
    this.maxRequestsPerHour = 1000;
  }
  
  async executeWithRateLimit(apiCall) {
    await this.checkAndWaitForRateLimit();
    
    try {
      const result = await apiCall();
      this.recordRequest(true);
      return result;
    } catch (error) {
      this.recordRequest(false);
      
      // 检查是否是限流错误
      if (this.isRateLimitError(error)) {
        console.log('触发 API 限流,等待后重试');
        await this.handleRateLimitError(error);
        
        // 重新尝试
        const result = await apiCall();
        this.recordRequest(true);
        return result;
      }
      
      throw error;
    }
  }
  
  async checkAndWaitForRateLimit() {
    const now = Date.now();
    const oneMinuteAgo = now - 60000;
    const oneHourAgo = now - 3600000;
    
    // 清理过期记录
    this.requests = this.requests.filter(req => req.timestamp > oneHourAgo);
    
    // 检查每分钟限制
    const recentRequests = this.requests.filter(req => req.timestamp > oneMinuteAgo);
    if (recentRequests.length >= this.maxRequestsPerMinute) {
      const waitTime = oneMinuteAgo - recentRequests[0].timestamp + 1000;
      console.log(`达到每分钟限制,等待 ${waitTime}ms`);
      await this.sleep(waitTime);
    }
    
    // 检查每小时限制
    if (this.requests.length >= this.maxRequestsPerHour) {
      const waitTime = oneHourAgo - this.requests[0].timestamp + 1000;
      console.log(`达到每小时限制,等待 ${waitTime}ms`);
      await this.sleep(waitTime);
    }
  }
  
  recordRequest(success) {
    const now = Date.now();
    this.requests.push({
      timestamp: now,
      success: success
    });
    
    // 保存到静态数据
    this.staticData.requests = this.requests;
  }
  
  isRateLimitError(error) {
    const message = error.message.toLowerCase();
    const statusCode = error.response?.status;
    
    return statusCode === 429 || 
           message.includes('rate limit') || 
           message.includes('too many requests');
  }
  
  async handleRateLimitError(error) {
    // 从响应头获取重试时间
    const retryAfter = error.response?.headers['retry-after'];
    
    if (retryAfter) {
      const waitTime = parseInt(retryAfter) * 1000;
      console.log(`API 返回 Retry-After: ${retryAfter}s,等待 ${waitTime}ms`);
      await this.sleep(waitTime);
    } else {
      // 默认等待 60 秒
      console.log('使用默认等待时间 60 秒');
      await this.sleep(60000);
    }
  }
  
  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 使用限流处理器
const rateLimitHandler = new RateLimitHandler();

const apiCall = async () => {
  return await fetch($json.apiUrl, {
    method: 'GET',
    headers: {
      'Authorization': `Bearer ${$env.API_TOKEN}`
    }
  }).then(response => {
    if (!response.ok) {
      const error = new Error(`API call failed: ${response.status}`);
      error.response = response;
      throw error;
    }
    return response.json();
  });
};

const result = await rateLimitHandler.executeWithRateLimit(apiCall);

return [{ json: result }];

批量 API 操作

// Function 节点:批量 API 处理
const items = $input.all();
const batchSize = 10;
const concurrentLimit = 3;
const results = [];
const errors = [];

async function processBatch(batch) {
  const promises = batch.map(async (item) => {
    try {
      const result = await callAPI(item.json);
      return { success: true, data: result, originalItem: item.json };
    } catch (error) {
      return { success: false, error: error.message, originalItem: item.json };
    }
  });
  
  return await Promise.allSettled(promises);
}

async function callAPI(itemData) {
  const response = await fetch(`${$env.API_BASE_URL}/items`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${$env.API_TOKEN}`
    },
    body: JSON.stringify(itemData)
  });
  
  if (!response.ok) {
    throw new Error(`API call failed: ${response.status} ${response.statusText}`);
  }
  
  return await response.json();
}

// 分批处理
for (let i = 0; i < items.length; i += batchSize) {
  const batch = items.slice(i, i + batchSize);
  console.log(`处理第 ${Math.floor(i/batchSize) + 1} 批,共 ${batch.length} 个项目`);
  
  const batchResults = await processBatch(batch);
  
  batchResults.forEach(result => {
    if (result.status === 'fulfilled') {
      if (result.value.success) {
        results.push(result.value);
      } else {
        errors.push(result.value);
      }
    } else {
      errors.push({
        success: false,
        error: result.reason.message,
        originalItem: null
      });
    }
  });
  
  // 批次间添加延迟
  if (i + batchSize < items.length) {
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
}

console.log(`批量处理完成: 成功 ${results.length}, 失败 ${errors.length}`);

return [{
  json: {
    summary: {
      total: items.length,
      successful: results.length,
      failed: errors.length,
      successRate: ((results.length / items.length) * 100).toFixed(2) + '%'
    },
    results: results,
    errors: errors
  }
}];

🚀 实战案例:完整的第三方集成

CRM 系统集成示例

// Function 节点:CRM 系统完整集成
class CRMIntegration {
  constructor() {
    this.baseUrl = $env.CRM_BASE_URL;
    this.apiKey = $env.CRM_API_KEY;
    this.retryAttempts = 3;
    this.retryDelay = 2000;
  }
  
  async syncCustomerData(customerData) {
    console.log(`开始同步客户数据: ${customerData.email}`);
    
    try {
      // 1. 检查客户是否已存在
      const existingCustomer = await this.findCustomerByEmail(customerData.email);
      
      if (existingCustomer) {
        // 2. 更新现有客户
        const updatedCustomer = await this.updateCustomer(existingCustomer.id, customerData);
        console.log(`客户信息已更新: ID ${updatedCustomer.id}`);
        return { action: 'updated', customer: updatedCustomer };
      } else {
        // 3. 创建新客户
        const newCustomer = await this.createCustomer(customerData);
        console.log(`新客户已创建: ID ${newCustomer.id}`);
        return { action: 'created', customer: newCustomer };
      }
    } catch (error) {
      console.error('客户数据同步失败:', error.message);
      throw error;
    }
  }
  
  async findCustomerByEmail(email) {
    const response = await this.makeRequest('GET', `/customers/search?email=${encodeURIComponent(email)}`);
    return response.customers && response.customers.length > 0 ? response.customers[0] : null;
  }
  
  async createCustomer(customerData) {
    const payload = {
      first_name: customerData.firstName,
      last_name: customerData.lastName,
      email: customerData.email,
      phone: customerData.phone,
      company: customerData.company,
      source: 'n8n_automation',
      custom_fields: {
        registration_date: customerData.registrationDate,
        preferred_language: customerData.language || 'zh-CN',
        marketing_consent: customerData.marketingConsent || false
      }
    };
    
    return await this.makeRequest('POST', '/customers', payload);
  }
  
  async updateCustomer(customerId, customerData) {
    const payload = {
      first_name: customerData.firstName,
      last_name: customerData.lastName,
      phone: customerData.phone,
      company: customerData.company,
      custom_fields: {
        last_activity: new Date().toISOString(),
        total_orders: customerData.totalOrders,
        total_spent: customerData.totalSpent,
        preferred_language: customerData.language
      }
    };
    
    return await this.makeRequest('PUT', `/customers/${customerId}`, payload);
  }
  
  async makeRequest(method, endpoint, payload = null) {
    const url = `${this.baseUrl}${endpoint}`;
    
    const options = {
      method: method,
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${this.apiKey}`,
        'User-Agent': 'N8N-CRM-Integration/1.0'
      }
    };
    
    if (payload) {
      options.body = JSON.stringify(payload);
    }
    
    let lastError;
    
    for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
      try {
        console.log(`${method} ${url} (尝试 ${attempt}/${this.retryAttempts})`);
        
        const response = await fetch(url, options);
        
        if (!response.ok) {
          throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
        
        const data = await response.json();
        return data;
        
      } catch (error) {
        lastError = error;
        console.warn(`尝试 ${attempt} 失败:`, error.message);
        
        if (attempt < this.retryAttempts) {
          const delay = this.retryDelay * attempt;
          console.log(`等待 ${delay}ms 后重试`);
          await new Promise(resolve => setTimeout(resolve, delay));
        }
      }
    }
    
    throw new Error(`所有重试尝试失败: ${lastError.message}`);
  }
}

// 主处理逻辑
const crmIntegration = new CRMIntegration();
const result = await crmIntegration.syncCustomerData($json);

return [{ json: result }];

🎯 最佳实践

1. API 设计原则

  • 使用 RESTful 设计
  • 提供清晰的错误消息
  • 实现幂等性操作
  • 支持批量操作

2. 安全考虑

  • 始终使用 HTTPS
  • 实现请求签名验证
  • 设置 IP 白名单
  • 使用短期 Token

3. 性能优化

  • 实现请求缓存
  • 使用连接池
  • 合理设置超时时间
  • 避免过度请求

4. 错误处理

  • 实现重试机制
  • 提供降级方案
  • 记录详细日志
  • 监控 API 健康状态

🚀 下一步学习

掌握 Webhook 和 API 集成后,继续学习:

  1. 定时任务 - 精确控制执行时间
  2. 监控告警 - 监控集成状态
  3. 安全最佳实践 - 保障集成安全

Webhook 和 API 集成是连接外部世界的桥梁。通过掌握这些技术,你能够构建强大的系统集成解决方案,让 N8N 成为你数字化生态系统的中枢!