数字广告技术全景

数字化广告已经从简单的 Banner 投放演进到了由数据和算法驱动的智能营销系统。本文将系统梳理数字广告技术的核心模块。

广告技术生态

1
2
3
广告主 → DSP → Ad Exchange → SSP → 媒体/App
↑ ↑
└── DMP ─────────┘

核心概念

术语 全称 作用
DSP Demand-Side Platform 广告主端需求方平台
SSP Supply-Side Platform 媒体端供应方平台
DMP Data Management Platform 数据管理平台
Ad Exchange 广告交易平台 连接买卖双方
RTB Real-Time Bidding 实时竞价

RTB 实时竞价流程

一次广告展示的完整流程(发生在 100ms 内):

1
2
3
4
5
6
7
8
1. 用户打开页面
2. SSP 向 Ad Exchange 发送竞价请求(Bid Request)
3. Ad Exchange 转发给多个 DSP
4. DSP 根据用户画像和广告策略出价(Bid Response)
5. Ad Exchange 进行竞价,选出最高出价
6. 胜出的 DSP 返回广告素材
7. 用户看到广告
8. 如果用户点击/转化,触发追踪事件

Bid Request 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"id": "bid-request-001",
"imp": [{
"id": "imp-001",
"banner": {
"w": 300,
"h": 250
},
"bidfloor": 0.5
}],
"site": {
"domain": "example.com",
"page": "https://example.com/article"
},
"device": {
"ua": "Mozilla/5.0...",
"ip": "203.0.113.1",
"geo": {
"country": "CN",
"city": "Shenzhen"
}
},
"user": {
"id": "user-abc-123"
}
}

竞价策略实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class BiddingStrategy:
"""广告竞价策略"""

def __init__(self, budget: float, target_cpa: float):
self.budget = budget
self.target_cpa = target_cpa
self.spent = 0.0

def calculate_bid(
self,
predicted_ctr: float,
predicted_cvr: float,
bid_floor: float
) -> float:
"""计算出价"""
# eCPM = pCTR × pCVR × target_CPA × 1000
ecpm = predicted_ctr * predicted_cvr * self.target_cpa * 1000

# 转换为 CPM 出价
bid_price = ecpm / 1000

# 预算控制
if self.spent >= self.budget:
return 0.0

# 确保出价高于底价
if bid_price < bid_floor:
return 0.0

return round(bid_price, 4)

def on_win(self, price: float):
"""竞价胜出时更新花费"""
self.spent += price

CTR 预估模型

点击率预估是广告系统的核心模块。

特征工程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def extract_features(request: dict) -> dict:
"""提取广告请求特征"""
features = {}

# 用户特征
features['user_id'] = request['user']['id']
features['device_type'] = parse_device(request['device']['ua'])
features['geo_city'] = request['device']['geo']['city']
features['hour'] = datetime.now().hour
features['day_of_week'] = datetime.now().weekday()

# 广告位特征
features['site_domain'] = request['site']['domain']
features['ad_size'] = f"{request['imp'][0]['banner']['w']}x{request['imp'][0]['banner']['h']}"

# 上下文特征
features['page_category'] = classify_page(request['site']['page'])

return features

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, log_loss

def train_ctr_model(data, labels):
"""训练 CTR 预估模型"""
X_train, X_test, y_train, y_test = train_test_split(
data, labels, test_size=0.2, random_state=42
)

# LightGBM 模型
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_test, label=y_test)

params = {
'objective': 'binary',
'metric': ['binary_logloss', 'auc'],
'learning_rate': 0.05,
'num_leaves': 63,
'max_depth': 7,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1
}

model = lgb.train(
params,
train_data,
num_boost_round=1000,
valid_sets=[valid_data],
callbacks=[lgb.early_stopping(50)]
)

# 评估
y_pred = model.predict(X_test)
auc = roc_auc_score(y_test, y_pred)
logloss = log_loss(y_test, y_pred)

print(f"AUC: {auc:.4f}")
print(f"LogLoss: {logloss:.4f}")

return model

归因分析

归因分析解决”用户转化应该归功于哪个广告触点”的问题。

常见归因模型

模型 描述 适用场景
末次点击 转化归功于最后一次点击 效果广告
首次点击 转化归功于第一次点击 品牌广告
线性归因 均匀分配功劳 简单分析
时间衰减 越靠近转化的触点功劳越大 综合评估
数据驱动 用算法计算每个触点的贡献 精准分析

实现 Shapley Value 归因

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from itertools import combinations

def shapley_attribution(touchpoints: list, conversion_func) -> dict:
"""基于 Shapley Value 的归因计算"""
n = len(touchpoints)
attribution = {tp: 0.0 for tp in touchpoints}

for tp in touchpoints:
others = [t for t in touchpoints if t != tp]

for size in range(len(others) + 1):
for subset in combinations(others, size):
subset_list = list(subset)

# 没有该触点时的转化率
without = conversion_func(subset_list)
# 有该触点时的转化率
with_tp = conversion_func(subset_list + [tp])

# 边际贡献
marginal = with_tp - without

# 权重
weight = (factorial(size) * factorial(n - size - 1)) / factorial(n)

attribution[tp] += weight * marginal

return attribution

A/B 测试框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import numpy as np
from scipy import stats

class ABTest:
"""广告 A/B 测试框架"""

def __init__(self, name: str, variants: list[str]):
self.name = name
self.variants = variants
self.data = {v: {'impressions': 0, 'clicks': 0, 'conversions': 0}
for v in variants}

def assign_variant(self, user_id: str) -> str:
"""基于用户 ID 的稳定分流"""
hash_val = hash(f"{self.name}:{user_id}") % 100
variant_index = hash_val % len(self.variants)
return self.variants[variant_index]

def record(self, variant: str, event: str):
"""记录事件"""
self.data[variant][event] += 1

def analyze(self) -> dict:
"""分析测试结果"""
results = {}

control = self.variants[0]
control_data = self.data[control]
control_ctr = control_data['clicks'] / max(control_data['impressions'], 1)

for variant in self.variants[1:]:
variant_data = self.data[variant]
variant_ctr = variant_data['clicks'] / max(variant_data['impressions'], 1)

# 双样本 Z 检验
z_stat, p_value = self._z_test(
control_data['clicks'], control_data['impressions'],
variant_data['clicks'], variant_data['impressions']
)

lift = (variant_ctr - control_ctr) / max(control_ctr, 1e-10)

results[variant] = {
'ctr': variant_ctr,
'lift': f"{lift:.2%}",
'p_value': round(p_value, 4),
'significant': p_value < 0.05
}

return results

@staticmethod
def _z_test(c1, n1, c2, n2):
p1, p2 = c1/n1, c2/n2
p_pool = (c1 + c2) / (n1 + n2)
se = np.sqrt(p_pool * (1-p_pool) * (1/n1 + 1/n2))
z = (p2 - p1) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
return z, p_value

智能投放:AI 驱动的广告优化

预算分配优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def optimize_budget_allocation(
campaigns: list[dict],
total_budget: float
) -> dict:
"""基于历史 ROI 的预算分配优化"""
# 计算每个 campaign 的 ROI
for camp in campaigns:
camp['roi'] = camp['revenue'] / max(camp['cost'], 1)

# 按 ROI 排序
campaigns.sort(key=lambda x: x['roi'], reverse=True)

allocation = {}
remaining = total_budget

for camp in campaigns:
if remaining <= 0:
break

# 基于 ROI 权重分配预算
weight = camp['roi'] / sum(c['roi'] for c in campaigns)
budget = min(total_budget * weight * 1.2, remaining)

allocation[camp['id']] = round(budget, 2)
remaining -= budget

return allocation

广告技术栈推荐

层级 技术
数据采集 Google Tag Manager, Facebook Pixel
数据存储 BigQuery, ClickHouse, Redis
特征工程 Spark, Pandas
模型训练 LightGBM, XGBoost, PyTorch
模型服务 TensorFlow Serving, FastAPI
实时竞价 Go / Rust (高性能)
可视化 Grafana, Metabase
广告平台 Google Ads API, Meta Marketing API

总结

数字化广告营销的技术核心:

  1. 数据驱动:用数据而非直觉做决策
  2. 算法优化:CTR/CVR 预估、竞价策略、预算分配
  3. 实时响应:100ms 内完成竞价和投放决策
  4. 效果归因:科学评估每个渠道的贡献
  5. 持续迭代:A/B 测试验证每一次优化

AI 正在重新定义广告技术 —— 从智能出价到创意生成,从用户理解到效果预测。掌握这些技术,你就掌握了数字增长的密码。


这是 Mosaic Lab 广告技术系列的第一篇,后续会深入探讨每个模块的实现细节。

有问题欢迎评论交流!