Webhook与API集成
掌握N8N中的Webhook和API集成技巧,实现与外部系统的无缝连接
Webhook与API集成
Webhook 和 API 集成是 N8N 连接外部世界的核心技术。通过这些技术,你可以让 N8N 接收外部事件、调用外部服务,构建真正的系统集成解决方案。
🔗 Webhook 基础
Webhook 概念
Webhook 是一种"反向 API",允许外部系统在特定事件发生时主动推送数据到你的 N8N 工作流。
// Webhook 工作原理
外部系统事件 → HTTP POST 请求 → N8N Webhook 节点 → 触发工作流执行
Webhook 节点配置
// 基本配置
HTTP Method: POST, GET, PUT, DELETE
Path: /webhook/order-created // 自定义路径
Authentication: None, Basic Auth, Header Auth
Response Mode: "On Received Call", "Last Node"
// 生成的 URL 格式
https://your-n8n.com/webhook/order-created
https://your-n8n.com/webhook-test/order-created // 测试 URL
📥 Webhook 接收数据
处理不同格式的数据
// JSON 数据处理
// Webhook 接收到的数据在 $json 中
{
"event": "order.created",
"orderId": "ORD-12345",
"customer": {
"id": "CUST-67890",
"name": "张三",
"email": "[email protected]"
},
"items": [
{
"productId": "PROD-001",
"quantity": 2,
"price": 299.99
}
],
"total": 599.98,
"timestamp": "2024-01-15T10:30:00Z"
}
// 在后续节点中访问数据
{{ $json.orderId }} // ORD-12345
{{ $json.customer.name }} // 张三
{{ $json.items[0].productId }} // PROD-001
{{ $json.total }} // 599.98
表单数据处理
// Form URL Encoded 数据
// 来自 HTML 表单的数据
name=张三&email=[email protected]&message=产品咨询
// 在 N8N 中访问
{{ $json.name }} // 张三
{{ $json.email }} // [email protected]
{{ $json.message }} // 产品咨询
文件上传处理
// Webhook 配置
Options: {
"binaryData": true // 启用二进制数据处理
}
// 处理上传的文件
// Function 节点
const fileData = $input.first().binary;
if (fileData && fileData.data) {
const fileName = fileData.fileName || 'uploaded_file';
const mimeType = fileData.mimeType;
const fileSize = fileData.data.length;
console.log(`接收到文件: ${fileName}, 类型: ${mimeType}, 大小: ${fileSize} bytes`);
// 保存文件到本地或云存储
const fs = require('fs');
const path = `/uploads/${fileName}`;
// 如果是 base64 编码的数据
const buffer = Buffer.from(fileData.data, 'base64');
fs.writeFileSync(path, buffer);
return [{
json: {
success: true,
fileName: fileName,
filePath: path,
fileSize: fileSize,
uploadedAt: new Date().toISOString()
}
}];
}
🔐 Webhook 安全
签名验证
// Function 节点:验证 Webhook 签名
const crypto = require('crypto');
function verifyWebhookSignature(payload, signature, secret) {
// GitHub 风格的签名验证
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload)
.digest('hex');
const providedSignature = signature.replace('sha256=', '');
return crypto.timingSafeEqual(
Buffer.from(expectedSignature, 'hex'),
Buffer.from(providedSignature, 'hex')
);
}
// 获取请求数据
const payload = JSON.stringify($json);
const signature = $request.headers['x-hub-signature-256'];
const secret = $env.WEBHOOK_SECRET;
// 验证签名
if (!signature) {
throw new Error('缺少签名头');
}
if (!verifyWebhookSignature(payload, signature, secret)) {
throw new Error('签名验证失败');
}
console.log('Webhook 签名验证成功');
return [{ json: { verified: true, ...$json } }];
IP 白名单验证
// Function 节点:IP 地址验证
const allowedIPs = [
'192.168.1.0/24', // 内网段
'203.0.113.1', // 特定IP
'140.82.112.0/20' // GitHub IP段
];
function isIPAllowed(clientIP, allowedList) {
const { parse } = require('ipaddr.js');
try {
const client = parse(clientIP);
for (const allowed of allowedList) {
if (allowed.includes('/')) {
// CIDR 格式
const [ip, prefix] = allowed.split('/');
const network = parse(ip);
if (client.match(network, parseInt(prefix))) {
return true;
}
} else {
// 单个 IP
if (client.toString() === allowed) {
return true;
}
}
}
return false;
} catch (error) {
console.error('IP 解析错误:', error);
return false;
}
}
// 获取客户端 IP
const clientIP = $request.headers['x-forwarded-for'] ||
$request.headers['x-real-ip'] ||
$request.connection?.remoteAddress;
if (!isIPAllowed(clientIP, allowedIPs)) {
throw new Error(`IP 地址 ${clientIP} 不在白名单中`);
}
console.log(`IP 验证通过: ${clientIP}`);
return [{ json: { ...$json, clientIP } }];
API Key 认证
// Webhook 节点配置
Authentication: "Header Auth"
Header Name: "X-API-Key"
Header Value: "{{ $env.WEBHOOK_API_KEY }}"
// 或者在 Function 节点中验证
const providedKey = $request.headers['x-api-key'];
const validKey = $env.WEBHOOK_API_KEY;
if (!providedKey || providedKey !== validKey) {
throw new Error('无效的 API Key');
}
🌐 HTTP Request API 调用
基础 API 调用
// HTTP Request 节点配置
Method: POST
URL: https://api.example.com/users
Headers: {
"Content-Type": "application/json",
"Authorization": "Bearer {{ $env.API_TOKEN }}"
}
Body: {
"name": "{{ $json.name }}",
"email": "{{ $json.email }}",
"metadata": {
"source": "n8n",
"timestamp": "{{ new Date().toISOString() }}"
}
}
动态 URL 构建
// 使用表达式构建动态 URL
URL: "https://api.example.com/users/{{ $json.userId }}/orders"
// 或者使用 Function 节点构建复杂 URL
const baseUrl = 'https://api.example.com';
const endpoint = '/search';
const params = new URLSearchParams({
q: $json.searchTerm,
page: $json.page || 1,
limit: $json.limit || 20,
sort: $json.sortBy || 'created_at',
order: $json.sortOrder || 'desc'
});
const fullUrl = `${baseUrl}${endpoint}?${params.toString()}`;
return [{
json: {
url: fullUrl,
originalData: $json
}
}];
请求头管理
// Function 节点:动态请求头
const headers = {
'Content-Type': 'application/json',
'User-Agent': 'N8N-Automation/1.0',
'X-Request-ID': crypto.randomUUID(),
'X-Timestamp': new Date().toISOString()
};
// 根据环境添加认证头
if ($env.NODE_ENV === 'production') {
headers['Authorization'] = `Bearer ${$env.PROD_API_TOKEN}`;
} else {
headers['Authorization'] = `Bearer ${$env.DEV_API_TOKEN}`;
}
// 根据 API 版本添加版本头
if ($json.apiVersion) {
headers['API-Version'] = $json.apiVersion;
}
// 添加自定义业务头
if ($json.tenantId) {
headers['X-Tenant-ID'] = $json.tenantId;
}
return [{
json: {
headers: headers,
requestData: $json
}
}];
🔄 API 响应处理
响应状态处理
// Function 节点:处理不同的响应状态
const response = $input.first().json;
const statusCode = response.statusCode || $json.statusCode;
switch (Math.floor(statusCode / 100)) {
case 2: // 2xx 成功
return handleSuccessResponse(response);
case 3: // 3xx 重定向
return handleRedirectResponse(response);
case 4: // 4xx 客户端错误
return handleClientError(response);
case 5: // 5xx 服务器错误
return handleServerError(response);
default:
throw new Error(`未知的状态码: ${statusCode}`);
}
function handleSuccessResponse(response) {
console.log('API 调用成功');
return [{
json: {
success: true,
data: response.body || response,
statusCode: response.statusCode
}
}];
}
function handleClientError(response) {
const statusCode = response.statusCode;
switch (statusCode) {
case 400:
throw new Error(`请求参数错误: ${response.body?.message || '未知错误'}`);
case 401:
throw new Error('认证失败,请检查 API Token');
case 403:
throw new Error('权限不足,无法访问该资源');
case 404:
throw new Error('请求的资源不存在');
case 429:
throw new Error('请求频率超限,请稍后重试');
default:
throw new Error(`客户端错误 ${statusCode}: ${response.body?.message || '未知错误'}`);
}
}
function handleServerError(response) {
const statusCode = response.statusCode;
const errorMessage = `服务器错误 ${statusCode}: ${response.body?.message || '服务器内部错误'}`;
// 5xx 错误通常可以重试
throw new Error(errorMessage);
}
分页数据处理
// Function 节点:处理分页 API 响应
const allData = [];
let currentPage = 1;
const maxPages = 10; // 防止无限循环
async function fetchAllPages() {
while (currentPage <= maxPages) {
console.log(`获取第 ${currentPage} 页数据`);
const response = await fetch(`${$json.apiUrl}?page=${currentPage}&limit=100`, {
headers: {
'Authorization': `Bearer ${$env.API_TOKEN}`
}
});
const data = await response.json();
if (!data.items || data.items.length === 0) {
console.log('没有更多数据,停止分页');
break;
}
allData.push(...data.items);
// 检查是否还有下一页
if (!data.hasNextPage || currentPage >= data.totalPages) {
console.log('已获取所有页面数据');
break;
}
currentPage++;
// 添加延迟避免 API 限流
await new Promise(resolve => setTimeout(resolve, 100));
}
return allData;
}
const allItems = await fetchAllPages();
console.log(`总共获取 ${allItems.length} 条数据`);
return [{
json: {
totalItems: allItems.length,
items: allItems,
pagesProcessed: currentPage - 1
}
}];
🔧 高级集成技巧
OAuth 2.0 认证流程
// Function 节点:OAuth 2.0 Token 管理
class OAuth2TokenManager {
constructor() {
this.clientId = $env.OAUTH_CLIENT_ID;
this.clientSecret = $env.OAUTH_CLIENT_SECRET;
this.tokenUrl = $env.OAUTH_TOKEN_URL;
this.staticData = $node.getWorkflowStaticData('global');
}
async getValidToken() {
const now = Date.now();
const token = this.staticData.accessToken;
const expiresAt = this.staticData.tokenExpiresAt || 0;
// 检查 token 是否过期(提前 5 分钟刷新)
if (!token || now > (expiresAt - 300000)) {
console.log('Token 已过期或即将过期,刷新 token');
return await this.refreshToken();
}
return token;
}
async refreshToken() {
const refreshToken = this.staticData.refreshToken;
if (!refreshToken) {
throw new Error('缺少 refresh token,需要重新授权');
}
try {
const response = await fetch(this.tokenUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': `Basic ${Buffer.from(`${this.clientId}:${this.clientSecret}`).toString('base64')}`
},
body: new URLSearchParams({
grant_type: 'refresh_token',
refresh_token: refreshToken
})
});
const tokenData = await response.json();
if (!response.ok) {
throw new Error(`Token 刷新失败: ${tokenData.error_description || tokenData.error}`);
}
// 保存新的 token
this.staticData.accessToken = tokenData.access_token;
this.staticData.tokenExpiresAt = Date.now() + (tokenData.expires_in * 1000);
if (tokenData.refresh_token) {
this.staticData.refreshToken = tokenData.refresh_token;
}
console.log('Token 刷新成功');
return tokenData.access_token;
} catch (error) {
console.error('Token 刷新失败:', error);
throw error;
}
}
}
// 使用 Token Manager
const tokenManager = new OAuth2TokenManager();
const accessToken = await tokenManager.getValidToken();
return [{
json: {
accessToken: accessToken,
tokenValid: true
}
}];
API 限流处理
// Function 节点:智能限流处理
class RateLimitHandler {
constructor() {
this.staticData = $node.getWorkflowStaticData('node');
this.requests = this.staticData.requests || [];
this.maxRequestsPerMinute = 60;
this.maxRequestsPerHour = 1000;
}
async executeWithRateLimit(apiCall) {
await this.checkAndWaitForRateLimit();
try {
const result = await apiCall();
this.recordRequest(true);
return result;
} catch (error) {
this.recordRequest(false);
// 检查是否是限流错误
if (this.isRateLimitError(error)) {
console.log('触发 API 限流,等待后重试');
await this.handleRateLimitError(error);
// 重新尝试
const result = await apiCall();
this.recordRequest(true);
return result;
}
throw error;
}
}
async checkAndWaitForRateLimit() {
const now = Date.now();
const oneMinuteAgo = now - 60000;
const oneHourAgo = now - 3600000;
// 清理过期记录
this.requests = this.requests.filter(req => req.timestamp > oneHourAgo);
// 检查每分钟限制
const recentRequests = this.requests.filter(req => req.timestamp > oneMinuteAgo);
if (recentRequests.length >= this.maxRequestsPerMinute) {
const waitTime = oneMinuteAgo - recentRequests[0].timestamp + 1000;
console.log(`达到每分钟限制,等待 ${waitTime}ms`);
await this.sleep(waitTime);
}
// 检查每小时限制
if (this.requests.length >= this.maxRequestsPerHour) {
const waitTime = oneHourAgo - this.requests[0].timestamp + 1000;
console.log(`达到每小时限制,等待 ${waitTime}ms`);
await this.sleep(waitTime);
}
}
recordRequest(success) {
const now = Date.now();
this.requests.push({
timestamp: now,
success: success
});
// 保存到静态数据
this.staticData.requests = this.requests;
}
isRateLimitError(error) {
const message = error.message.toLowerCase();
const statusCode = error.response?.status;
return statusCode === 429 ||
message.includes('rate limit') ||
message.includes('too many requests');
}
async handleRateLimitError(error) {
// 从响应头获取重试时间
const retryAfter = error.response?.headers['retry-after'];
if (retryAfter) {
const waitTime = parseInt(retryAfter) * 1000;
console.log(`API 返回 Retry-After: ${retryAfter}s,等待 ${waitTime}ms`);
await this.sleep(waitTime);
} else {
// 默认等待 60 秒
console.log('使用默认等待时间 60 秒');
await this.sleep(60000);
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用限流处理器
const rateLimitHandler = new RateLimitHandler();
const apiCall = async () => {
return await fetch($json.apiUrl, {
method: 'GET',
headers: {
'Authorization': `Bearer ${$env.API_TOKEN}`
}
}).then(response => {
if (!response.ok) {
const error = new Error(`API call failed: ${response.status}`);
error.response = response;
throw error;
}
return response.json();
});
};
const result = await rateLimitHandler.executeWithRateLimit(apiCall);
return [{ json: result }];
批量 API 操作
// Function 节点:批量 API 处理
const items = $input.all();
const batchSize = 10;
const concurrentLimit = 3;
const results = [];
const errors = [];
async function processBatch(batch) {
const promises = batch.map(async (item) => {
try {
const result = await callAPI(item.json);
return { success: true, data: result, originalItem: item.json };
} catch (error) {
return { success: false, error: error.message, originalItem: item.json };
}
});
return await Promise.allSettled(promises);
}
async function callAPI(itemData) {
const response = await fetch(`${$env.API_BASE_URL}/items`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${$env.API_TOKEN}`
},
body: JSON.stringify(itemData)
});
if (!response.ok) {
throw new Error(`API call failed: ${response.status} ${response.statusText}`);
}
return await response.json();
}
// 分批处理
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
console.log(`处理第 ${Math.floor(i/batchSize) + 1} 批,共 ${batch.length} 个项目`);
const batchResults = await processBatch(batch);
batchResults.forEach(result => {
if (result.status === 'fulfilled') {
if (result.value.success) {
results.push(result.value);
} else {
errors.push(result.value);
}
} else {
errors.push({
success: false,
error: result.reason.message,
originalItem: null
});
}
});
// 批次间添加延迟
if (i + batchSize < items.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
console.log(`批量处理完成: 成功 ${results.length}, 失败 ${errors.length}`);
return [{
json: {
summary: {
total: items.length,
successful: results.length,
failed: errors.length,
successRate: ((results.length / items.length) * 100).toFixed(2) + '%'
},
results: results,
errors: errors
}
}];
🚀 实战案例:完整的第三方集成
CRM 系统集成示例
// Function 节点:CRM 系统完整集成
class CRMIntegration {
constructor() {
this.baseUrl = $env.CRM_BASE_URL;
this.apiKey = $env.CRM_API_KEY;
this.retryAttempts = 3;
this.retryDelay = 2000;
}
async syncCustomerData(customerData) {
console.log(`开始同步客户数据: ${customerData.email}`);
try {
// 1. 检查客户是否已存在
const existingCustomer = await this.findCustomerByEmail(customerData.email);
if (existingCustomer) {
// 2. 更新现有客户
const updatedCustomer = await this.updateCustomer(existingCustomer.id, customerData);
console.log(`客户信息已更新: ID ${updatedCustomer.id}`);
return { action: 'updated', customer: updatedCustomer };
} else {
// 3. 创建新客户
const newCustomer = await this.createCustomer(customerData);
console.log(`新客户已创建: ID ${newCustomer.id}`);
return { action: 'created', customer: newCustomer };
}
} catch (error) {
console.error('客户数据同步失败:', error.message);
throw error;
}
}
async findCustomerByEmail(email) {
const response = await this.makeRequest('GET', `/customers/search?email=${encodeURIComponent(email)}`);
return response.customers && response.customers.length > 0 ? response.customers[0] : null;
}
async createCustomer(customerData) {
const payload = {
first_name: customerData.firstName,
last_name: customerData.lastName,
email: customerData.email,
phone: customerData.phone,
company: customerData.company,
source: 'n8n_automation',
custom_fields: {
registration_date: customerData.registrationDate,
preferred_language: customerData.language || 'zh-CN',
marketing_consent: customerData.marketingConsent || false
}
};
return await this.makeRequest('POST', '/customers', payload);
}
async updateCustomer(customerId, customerData) {
const payload = {
first_name: customerData.firstName,
last_name: customerData.lastName,
phone: customerData.phone,
company: customerData.company,
custom_fields: {
last_activity: new Date().toISOString(),
total_orders: customerData.totalOrders,
total_spent: customerData.totalSpent,
preferred_language: customerData.language
}
};
return await this.makeRequest('PUT', `/customers/${customerId}`, payload);
}
async makeRequest(method, endpoint, payload = null) {
const url = `${this.baseUrl}${endpoint}`;
const options = {
method: method,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`,
'User-Agent': 'N8N-CRM-Integration/1.0'
}
};
if (payload) {
options.body = JSON.stringify(payload);
}
let lastError;
for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
try {
console.log(`${method} ${url} (尝试 ${attempt}/${this.retryAttempts})`);
const response = await fetch(url, options);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
return data;
} catch (error) {
lastError = error;
console.warn(`尝试 ${attempt} 失败:`, error.message);
if (attempt < this.retryAttempts) {
const delay = this.retryDelay * attempt;
console.log(`等待 ${delay}ms 后重试`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
throw new Error(`所有重试尝试失败: ${lastError.message}`);
}
}
// 主处理逻辑
const crmIntegration = new CRMIntegration();
const result = await crmIntegration.syncCustomerData($json);
return [{ json: result }];
🎯 最佳实践
1. API 设计原则
- 使用 RESTful 设计
- 提供清晰的错误消息
- 实现幂等性操作
- 支持批量操作
2. 安全考虑
- 始终使用 HTTPS
- 实现请求签名验证
- 设置 IP 白名单
- 使用短期 Token
3. 性能优化
- 实现请求缓存
- 使用连接池
- 合理设置超时时间
- 避免过度请求
4. 错误处理
- 实现重试机制
- 提供降级方案
- 记录详细日志
- 监控 API 健康状态
🚀 下一步学习
掌握 Webhook 和 API 集成后,继续学习:
Webhook 和 API 集成是连接外部世界的桥梁。通过掌握这些技术,你能够构建强大的系统集成解决方案,让 N8N 成为你数字化生态系统的中枢!