搜索算法测试入门

🧠 搜索球队算法实现详解(工程级)


📌 目标回顾

用户输入一段关键词,如 real, 皇马, rmad, 马德里,系统应:

  • 快速从球队库中匹配最相关的球队
  • 支持拼音、别名、模糊拼写、简称
  • 排序返回 Top-N 最优匹配项

🧩 总体流程

graph TD
    A[用户输入关键词q] --> B[标准化预处理]
    B --> C[生成特征Token:简拼/全拼/别名/分词]
    C --> D[构建候选池]
    D --> E[匹配算法:编辑距离 + BM25/Tfidf]
    E --> F[综合排序打分]
    F --> G[返回Top-N球队]

🧱 Step 1:标准化 + Token生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import re
from pypinyin import lazy_pinyin, Style

def normalize_query(q: str):
"""清洗搜索关键词,去空格、统一大小写"""
q = q.lower().strip()
q = re.sub(r'[^\u4e00-\u9fa5a-z0-9]', '', q)
return q

def generate_tokens(q: str):
"""生成多种token形式:全拼、简拼、分词"""
full_pinyin = ''.join(lazy_pinyin(q))
simple_pinyin = ''.join(lazy_pinyin(q, style=Style.FIRST_LETTER))
return {
'original': q,
'full_pinyin': full_pinyin,
'simple_pinyin': simple_pinyin,
}

📦 Step 2:构建候选池(离线构建 + 内存加载)

每支球队结构如下:

1
2
3
4
5
6
7
8
9
10
11
team_index = [
{
"team_id": 1001,
"name_zh": "皇家马德里",
"name_en": "Real Madrid",
"alias": ["real", "rmad", "皇马"],
"full_pinyin": "huangjiamadelie",
"simple_pinyin": "hjmdl"
},
...
]

建议提前用 Redis 缓存简拼索引,提高首次命中率。


🔍 Step 3:匹配算法核心实现

方法1:编辑距离(Levenshtein)

1
2
3
4
5
6
7
import Levenshtein

def fuzzy_score(a: str, b: str):
"""返回字符串相似度,1分为满分"""
if not a or not b:
return 0
return 1 - Levenshtein.distance(a, b) / max(len(a), len(b))

方法2:TF-IDF 向量相似度

用于处理多词场景,如 “manchester united”

1
2
3
4
5
6
7
8
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

def tfidf_match(query, docs):
vectorizer = TfidfVectorizer()
tfidf = vectorizer.fit_transform([query] + docs)
sim = cosine_similarity(tfidf[0:1], tfidf[1:]).flatten()
return sim

📊 Step 4:综合评分逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def score_team(query_tokens, team):
"""对一支球队进行多种字段匹配,并计算最终得分"""
scores = []

for key in ['name_zh', 'name_en', 'full_pinyin', 'simple_pinyin']:
if key in team:
scores.append(fuzzy_score(query_tokens['original'], team[key]))
scores.append(fuzzy_score(query_tokens['full_pinyin'], team[key]))
scores.append(fuzzy_score(query_tokens['simple_pinyin'], team[key]))

# 别名匹配
for alias in team.get('alias', []):
scores.append(fuzzy_score(query_tokens['original'], alias))

return max(scores) # 或使用加权平均

🧪 Step 5:主函数 + 测试样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def search_teams(query, index, top_n=5):
q_norm = normalize_query(query)
q_tokens = generate_tokens(q_norm)

scored = []
for team in index:
score = score_team(q_tokens, team)
if score > 0.3:
scored.append((score, team))

scored.sort(key=lambda x: x[0], reverse=True)
return [team for _, team in scored[:top_n]]

# 示例:
results = search_teams("皇马", team_index)
for r in results:
print(f"✅ {r['name_zh']} - {r['name_en']}")

🚀 提升建议(大数据版本)

  1. 替换为 ElasticSearch 词向量匹配,实时搜索更强
  2. 引入 Query 重写器(Spell Correction)
  3. 用户画像词权重(基于行为分数加权)

✅ 最终总结

阶段 关键任务
标准化输入 清洗、拼音提取、编码统一
候选匹配 简拼、全拼、别名、中文名等多维匹配
相似度计算 Levenshtein 编辑距离、TF-IDF 语义匹配
排序与过滤 按分数降序过滤、返回Top-N

🧠 搜索球队算法(升级版)- 接入 ElasticSearch 完整实现 + 架构说明 + 测试方法


🌐 一、整体架构图(引入 ES)

graph TD
  A[用户输入关键词] --> B[标准化 + 拼音/简拼生成]
  B --> C[ES 查询构建器(Query Builder)]
  C --> D[ElasticSearch 执行查询]
  D --> E[获取初步结果 Top-K]
  E --> F[后处理:拼音/别名/语义二次打分]
  F --> G[Top-N结果返回前端展示]

🧩 二、ElasticSearch 在架构中的角色

模块 说明
ES 索引(Index) 存储所有球队信息,字段包括原名、拼音、别名、英文名等
Query DSL 用于构建多字段、模糊、权重查询
初筛任务 使用match,multi_match,should查询快速拉取 200 个 Top 候选
性能优化 借助倒排索引 + 分词器,轻量搜索毫秒级返回

🔌 三、数据库如何与 ElasticSearch 交互

1️⃣ 索引结构设计(Mapping)

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT /team_index
{
"mappings": {
"properties": {
"team_id": { "type": "keyword" },
"name_zh": { "type": "text", "analyzer": "ik_max_word" },
"name_en": { "type": "text" },
"alias": { "type": "text", "analyzer": "ik_smart" },
"simple_pinyin": { "type": "keyword" },
"full_pinyin": { "type": "keyword" }
}
}
}

建议中文字段用 ik 分词器(可用自定义),拼音字段用 keyword 精确匹配。


2️⃣ 数据同步策略

可选 2 种方式同步数据库到 ES:

🌀 后台定时同步

1
2
3
4
def sync_mysql_to_es():
teams = mysql.query("SELECT * FROM teams")
for team in teams:
es.index(index='team_index', id=team['id'], body=team)

⚡ 实时触发式(如Django + Celery)

监听新增/更新球队时:

1
2
3
@receiver(post_save, sender=Team)
def update_es_index(sender, instance, **kwargs):
es.index(index='team_index', id=instance.id, body=model_to_dict(instance))

🔍 四、ES 查询实现(Python 示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

def search_teams_es(query: str, size=20):
"""在ES中搜索球队"""
q = query.strip().lower()

body = {
"query": {
"bool": {
"should": [
{ "match": { "name_zh": q }},
{ "match": { "name_en": q }},
{ "match": { "alias": q }},
{ "term": { "simple_pinyin": q }},
{ "term": { "full_pinyin": q }}
]
}
},
"size": size
}

resp = es.search(index="team_index", body=body)
return [hit["_source"] for hit in resp["hits"]["hits"]]

若你要做 Typo 容错或拼写修正,也可引入 fuzzy, match_phrase_prefix, suggest 查询器。


🧪 五、测试方案设计

✅ 1. 单元测试

  • 使用固定的 query 如 "皇马", "real", "rmad", "huangjiamadelie"
  • 验证返回是否包含 Real Madrid 且排序靠前
1
2
3
4
def test_search_real_madrid():
results = search_teams_es("皇马")
names = [r['name_zh'] for r in results]
assert "皇家马德里" in names

✅ 2. 召回率测试(离线评估)

  • 建立一个 query -> ground truth 映射表
  • 测试 N 条 query 的 Top-K 召回是否正确
1
2
3
4
5
{
"皇马": ["皇家马德里"],
"man u": ["曼彻斯特联"],
"acmilan": ["AC米兰"]
}

使用 Python 脚本统计召回率、命中率、排序质量。


🎯 性能测试建议

  • 使用 abwrk 模拟高并发搜索请求,测试 ES 吞吐
  • 每次拉取 20 条候选,后端再执行二次重排打分

⚒️ 工程建议

模块 实践建议
数据建模 拼音字段冗余建模写入 ES
多语言支持 name_en、alias 等字段提前预处理
SpellCheck 可用elasticsearch-suggest模块集成
搜索日志分析 日志中记录 query + 返回 top3,训练热度模型

🧩 拼音生成器(简拼/全拼)集成


🔧 1. 引入依赖

使用阿里巴巴的 pinyin4j

1
2
3
4
5
<dependency>
<groupId>com.belerweb</groupId>
<artifactId>pinyin4j</artifactId>
<version>2.5.0</version>
</dependency>

🧠 2. 拼音工具类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class PinyinUtil {

// 获取全拼,如:北京 → beijing
public static String getFullPinyin(String chinese) {
StringBuilder sb = new StringBuilder();
for (char c : chinese.toCharArray()) {
if (Character.toString(c).matches("[\\u4E00-\\u9FA5]+")) {
String[] pinyinArray = PinyinHelper.toHanyuPinyinStringArray(c);
if (pinyinArray != null) {
sb.append(pinyinArray[0].replaceAll("\\d", ""));
}
} else {
sb.append(c);
}
}
return sb.toString().toLowerCase();
}

// 获取简拼,如:北京 → bj
public static String getSimplePinyin(String chinese) {
StringBuilder sb = new StringBuilder();
for (char c : chinese.toCharArray()) {
if (Character.toString(c).matches("[\\u4E00-\\u9FA5]+")) {
String[] pinyinArray = PinyinHelper.toHanyuPinyinStringArray(c);
if (pinyinArray != null && pinyinArray.length > 0) {
sb.append(pinyinArray[0].charAt(0));
}
} else {
sb.append(c.charAt(0));
}
}
return sb.toString().toLowerCase();
}
}

📝 3. 实体保存或更新时自动生成拼音字段

1
2
3
4
5
public Team preProcessTeam(Team team) {
team.setSimplePinyin(PinyinUtil.getSimplePinyin(team.getNameZh()));
team.setFullPinyin(PinyinUtil.getFullPinyin(team.getNameZh()));
return team;
}

建议在:

  • 后台录入
  • 数据库定时同步
  • Kafka 消费端

统一调用此逻辑。


🔄 实时数据同步机制:MySQL + Kafka + Debezium → ES 自动更新


📦 1. 核心组件说明

组件 作用
MySQL 数据源
Debezium 监听 MySQL binlog,实时捕捉数据变更
Kafka 接收 binlog 事件
Kafka Consumer(Spring) 消费事件、同步数据
Elasticsearch 接收索引更新

🧰 2. Debezium 监听 MySQL 配置(Docker 方式示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: '3'
services:
zookeeper:
image: zookeeper:3.6
ports: ["2181:2181"]

kafka:
image: confluentinc/cp-kafka
ports: ["9092:9092"]
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

mysql:
image: mysql:8
environment:
MYSQL_ROOT_PASSWORD: root

debezium:
image: debezium/connect:2.6
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses

注册 MySQL 监听器(POST 到 http://localhost:8083/connectors):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"name": "mysql-team-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "1",
"database.include.list": "sports_db",
"table.include.list": "sports_db.teams",
"database.server.name": "mysqlserver1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.full"
}
}

Kafka 会推送 mysqlserver1.sports_db.teams 主题。


🧑‍💻 3. Java 消费 Kafka → 自动更新 Elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
@RequiredArgsConstructor
public class TeamKafkaConsumer {

private final ObjectMapper objectMapper;
private final TeamESRepository teamESRepository;

@KafkaListener(topics = "mysqlserver1.sports_db.teams", groupId = "team-sync")
public void consumeTeamChange(String message) {
try {
JsonNode root = objectMapper.readTree(message);
JsonNode after = root.path("payload").path("after");

if (!after.isMissingNode()) {
TeamDocument doc = new TeamDocument();
doc.setId(after.get("id").asLong());
doc.setNameZh(after.get("nameZh").asText());
doc.setNameEn(after.get("nameEn").asText());
doc.setAlias(after.get("alias").asText());

doc.setSimplePinyin(PinyinUtil.getSimplePinyin(doc.getNameZh()));
doc.setFullPinyin(PinyinUtil.getFullPinyin(doc.getNameZh()));

teamESRepository.save(doc);
}
} catch (Exception e) {
log.error("Failed to sync team from Kafka", e);
}
}
}

🔍 测试建议

  • 使用 Kafka MockServer + 单元测试验证消费是否正确写入 ES。
  • 写入数据库模拟新增球队名,检查 ES 索引是否自动更新。
  • 修改已有球队名,测试拼音字段是否重新生成。

搜索算法测试入门
https://dreamshao.github.io/2025/04/14/搜索算法测试入门/
作者
Yun Shao
发布于
2025年4月14日
许可协议