普通视图

发现新文章,点击刷新页面。
昨天以前思为说

Fusion GraphRAG Introduced

2025年2月28日 21:33

本文分享我们团队在 GraphRAG 领域的探索与实践历程,介绍一下 FusionGraphRAG。

本文将分享我们团队在 GraphRAG 领域的探索与实践历程。GraphRAG 是检索增强生成(RAG)技术的高级技术之一,旨在解决传统 RAG 的局限性。上周,我在 Twitter 上分享了关于 GraphRAG Indexing 模型蒸馏训练的工作,收获了诸多积极反馈。本文应朋友们建议整理而成,涵盖我们自 2023 年 6 月以来在 GraphRAG 基础研究上的进展,以及对高级 RAG 和 Fusion GraphRAG 的理解与实践。

0.1 RAG 与 GraphRAG 的缘起

0.1.1 RAG:检索增强生成

过去两年来,生成式 AI 和大语言模型的飞速发展堪称人类技术史上的重要里程碑。它们赋予计算机前所未有的上下文学习和推理能力,使得许多专属人类的复杂任务得以自动化甚至超越。然而,如何高效利用这些模型完成具体任务,成为技术落地的关键。

RAG(Retrieval Augmented Generation,检索增强生成)作为一种基础范式应运而生。其核心在于通过“检索”(Retrieval)增强“生成”(Generation):

  • 生成:将任务描述和领域知识(即提示词)输入大语言模型,生成相应的回答。
  • 检索增强:利用检索技术为模型提供高度相关的上下文知识,从而提升生成质量。

在 RAG 中,检索环节至关重要。根据应用场景的不同,RAG 可分为两类:

  1. 面向公共检索系统:如 Google 或学术搜索引擎,依赖大模型调用外部 API,适用于通用知识获取。市面上的代表性产品包括 Perplexity Search 和 DeepSeek。
  2. 面向私有领域知识:如企业内部的研发文档或研究报告,需结合智能文档处理(IDP)、知识索引(Index)和知识管理系统。此类场景对检索技术的精准性和深度提出了更高要求。

0.1.2 朴素的 RAG

RAG_gif

对于非结构化或半结构化知识,朴素 RAG 的典型流程包括:

  • 将文档分割成块(Chunk)。
  • 通过向量化嵌入(Embedding)构建语义索引,或结合 BM25 全文索引实现检索。
  • 在任务执行时,基于任务需求直接检索或通过检索重写(如 HyDE)优化结果。

然而,这种方法在知识组织上的局限性使其难以满足企业级需求,具体挑战将在下文详述。

0.1.3 RAG 的挑战

朴素 RAG 在实际应用中面临以下难题:

  • 大海捞针:分块假设知识分布均匀,但某些关键信息可能被稀释在无关内容中,导致召回失败。
  • 穿针引线:线性分割破坏了知识间的关联,难以还原全局上下文。
  • 宏观问题:基于分片和关键词的索引无法有效应对全局性问题,如“哪些话题最具创新性”。
  • 文档理解不足:缺乏智能文档处理时,公式、表格等复杂信息无法被正确解析和索引,影响回答质量。

0.1.4 高级 RAG

做好 RAG 并非一蹴而就,而是一个持续探索的过程。本质上,我们需要将对应用特点、RAG 技术特性以及知识本身的深刻理解,融入到 RAG 流水线的设计之中。

下图展示了一个近期发布的高级 RAG 总结性工作的简化图例:一个高级 RAG

Attachment.png

可见,在现代高级的 RAG 方法中,数据持久化层包含 “Graph DB”(即图数据库,一种以图结构——“节点”和“边”为中心的数据库),而图数据库也正是本文主题 GraphRAG 的重要基础设施。

现代图数据库能够以高效、可扩展的方式驱动多租户、超大数据规模、高并发场景下复杂关联关系(图数据)的持久化、查询、访问和计算。它是高级 RAG 系统、智能体记忆系统(Agent Memory)的重要骨干基础设施,如下图所示。面向关联关系的图状结构,与我们梳理知识的脑图、甚至大脑中的神经元结构都非常相似。这种并非偶然的结构相似性,正是高级 RAG 知识索引解决朴素 RAG 方法挑战的关键。

graph_gif

常见的高级 RAG 方法,在特别的索引结构上,包括树状索引(Tree-RAG)、分层检索(Raptor)和图状索引(GraphRAG),其中 GraphRAG 是我们文章的重点。

0.1.5 GraphRAG

GraphRAG 这个方法最初是我们在 2023 年 8、9 月份开始,在 LlamaIndex 社区上持续做的一些工作和两次公开的 Webinar 上提出,并给出了开源的实现和可复现的效果比较试验。

在最初的 GraphRAG 参考实践中,我们给出了索引期基于大模型的同构图谱抽取的实现和查询区基于实体、关系语义的捞取与可配置的子图查询的知识召回,并比较了基于 Text-to-Query 方法的一些效果比较与结合方案的实现。现在,为了区隔之后 Graph-based RAG 方法的演进,我们把最初的 GraphRAG 叫做 SubGraph RAG。

在当时的 workshop 上,我们分享过的两个观点是

  • 在与知识打交道的过程中,只用分片处理的方法、绕过 Graph(Knowledge Graph)一定是不够的,这也是我们做 GraphRAG 探索的初衷
  • 这个 GraphRAG 的方法仅仅是一个开始,我们在不断探索更多在 Graph 之上的探索、推理的策略

随后,我们和 Researcher: Diego 一起讨论,做了图索引之上 Chain of Exploration 的工作,并在 PyCon China 2023 上第一次给出了做了 Chain of Exploration 的分享。

之后,我们看到了非常多的 GraphRAG 相关工作被发表,比如 MIT 材料科学的两篇 利用我们的工作的 MechGPT、海马体 RAG 利用 pagerank 权重的工作 HippoRAGLinkedIn 的利用 半结构化数据图建模的 RAG工作都非常有启发性。

MechGPT

当然,最让我们兴奋的工作是微软的 GraphRAG: From Local to Global 工作,他们充分探索了基于大模型抽取的知识图谱索引之上利用社区发现算法得到分级聚集结构下的全局性摘要,这些摘要是没有偏见的保有原始知识中天然聚集性质、重要性质的脉络知识,在回答带有宏观全局性的问题中效果非常好。

ms_paper

我们从这些后续的工作中受益匪浅,在与我们的客户 Graph based RAG 落地工作中,不断打磨、探索、累积了很多各个环节的优化、方法、策略。最终形成了我们称为 Fusion GraphRAG 的方案。

0.2 Fusion GraphRAG

Fusion GraphRAG 是我们团队在 GraphRAG 基础上的创新实践。它融合了高级 RAG 技术,通过图状结构存储文档层级、章节关系及特殊元素(如公式、表格),实现高效、灵活的检索。

FusionGraphRAG_GIF

Fusion GraphRAG 的本质在于 Sota 的高级 RAG 方法融合、充分连接的元知识索引、充分打磨调优的 GraphRAG(可选索引)。

  • Fusion GraphRAG 通过在一个联通图谱内的“元知识”索引,清晰地揭示海量知识文档的内在关联,呈现从文件夹、文档、章节到段落、图表、公式的完整脉络,此为知识图谱的“元知识频谱”。

  • 在此基础上,用户可选择不同粒度的知识抽取方法(非必须),构建图谱结构的图索引,形成“增强图频谱”。

  • 进一步,用户可以对图索引和元知识层进行诸如图摘要、权重分配、时序/状语信息补充等增强操作,以提升知识检索和利用的效率。

附图 FusionGraphRAG 结构实例

graph TD

%% ========= Style =========
classDef metaLayer fill:#f9f9f9,stroke:#b3b3b3,stroke-width:1px,font-size:13px;
classDef graphLayer fill:#c6dcff,stroke:#3f6fd9,stroke-width:1.4px,font-size:13px;
classDef graphSubLayer fill:#e8f1ff,stroke:#3f6fd9,stroke-width:1px,font-size:13px;
classDef enrichLayer fill:#fff9e8,stroke:#cfa514,stroke-width:1px,stroke-dasharray:4 4,font-size:13px;
classDef nodeText font-size:13px;
classDef queryFlow stroke:#444,stroke-dasharray:5 5;

%% ========= 元知识层 =========
subgraph MetaLayer["元知识层"]
  F1["📂 发动机控制项目"]
  F2["📂 高空失效案例库"]
  D1["📄 ECU 设计说明"]
  D2["📄 起动异常日志"]
  D3["📄 案例:高原起动失败"]
  F1 --> D1
  F1 --> D2
  F2 --> D3
end

%% 文档结构
D1 --> S1A["系统简介"]
S1A --> P1["1.1 接口说明"] --> L11["段落 1"] --> L12["段落 2"] --> L13["段落 3"]
S1A --> P2["1.2 电压异常"] --> L21["段落 1"] --> L22["段落 2"]
D1 --> S2A["硬件配置"]
D2 --> S1B["故障描述"] --> B11["1.1 现象"]
S1B --> B12["2.1 日志片段"]
D2 --> S2B["操作记录"]
D3 --> S1C["案例背景"] --> C11["1.1 4000 m 海拔"]
S1C --> C12["2.1 逻辑异常"]
D3 --> S2C["初步分析"]

%% 多模态结构
CH1["图:电压‑转速曲线"]
TB1["表:指令时序"]
FM1["公式:Q = αP√(T/R)"]
L12 --> CH1
L21 --> TB1
L22 --> FM1

%% ========= 图谱层 =========
subgraph GraphLayer["图谱层"]
  subgraph G1 ["电压故障社区"]
    Fault1["故障:ECU 电压低"]
    Sym1["症状:无法启动"]
    T1["时点:2023‑07‑22"]
    Env1["环境:4000 m"]
  end
  subgraph G2 ["操作流程社区"]
    Proc1["流程:更换模块"]
    Act1["措施:旁路供电"]
    Role1["角色:维修工程师"]
  end
end

class G1,G2 graphSubLayer

CH1 --> Fault1
TB1 --> Proc1
FM1 --> Proc1
L13 --> Fault1
L22 --> Proc1
B11 --> Sym1
B12 --> Act1
C11 --> Env1
Fault1 -->|引发| Sym1
Sym1 -->|定位| Fault1
Proc1 -->|解决| Fault1
Fault1 -->|发生于| T1
Proc1 -->|执行| Role1
Sym1 -->|受环境| Env1
Act1 -->|关联| Proc1

%% ========= 知识增强层 =========
subgraph EnrichLayer["知识增强层"]
  Sum1['社区摘要:电压故障']
  Sum2['社区摘要:操作流程']
  ChapSum1['章节摘要:系统简介']
  SeqEdge['时序增强']
  Rank['重要度评分']
end

G1 -->|摘要| Sum1
G2 -->|摘要| Sum2
SeqEdge --> GraphLayer
S1A --> ChapSum1

%% Style apply
class F1,F2,D1,D2,D3,S1A,S2A,S1B,S2B,S1C,S2C,P1,P2,L11,L12,L13,L21,L22,B11,B12,C11,C12 metaLayer
class CH1,TB1,FM1 metaLayer
class Fault1,Proc1,Sym1,T1,Role1,Env1,Act1 graphLayer
class Sum1,Sum2,ChapSum1,Rank,SeqEdge enrichLayer
class F1,F2,D1,D2,D3,S1A,S2A,S1B,S2B,S1C,S2C,P1,P2,L11,L12,L13,L21,L22,B11,B12,C11,C12,CH1,TB1,FM1,Fault1,Proc1,Sym1,T1,Role1,Env1,Act1,Sum1,Sum2,ChapSum1,Rank,SeqEdge nodeText

0.3 Fusion GraphRAG 实现私域 Deep Research

Fusion GraphRAG 融合了先进的 IDP 技术与 RAG 索引方法,并将提取的多模态知识有机地关联为图状结构,从而精确映射真实世界知识的内在逻辑。依托高性能、支持图语义与向量检索的现代图数据库作为持久化底层,Fusion GraphRAG 能在这一统一图谱上实现面向元知识层次与图谱增强的高级检索。

例如:

  • 多层级知识检索: 通过我们提出的 “Chain of Exploration” 检索器——一个能自主在图谱中探索并查找任务相关知识的智能 Agent,Fusion GraphRAG 可以在同时包含文件夹、文件层级以及章节(附摘要)结构信息的图谱上,既完成传统基于 Chunk 的检索任务,又能支持基于分层结构实现的脑图式 RAG 检索。(注:此方法不依赖昂贵的图谱抽取技术,后续将探讨如何进一步降低抽取成本。)
  • 全局与局部深度分析: 通过引入图抽取与图社区摘要等增强手段,“Chain of Exploration” 能够在图谱上按需执行全局搜索以解决宏观问题,同时进行针对多个知识点的精准局部检索,甚至对涉及复杂关联的深度问题(如关键知识间路径查找)进行分析。
  • 多 Agent 协同: 利用 Fusion GraphRAG 构建多 Agent 系统,可以将一整套算法与行为准则封装为一个专用的 Fusion GraphRAG Agent,此 Agent 可作为另一 RAG Agent 的召回计划生成器,在少而精的专业知识(know-how)构建的 Fusion Graph Index 指引下,自动规划出基于海量知识库的复杂查询方案。这为构建私有领域的自主学习 Deep Research/Action Agent 提供了有力支撑。
  • 私有知识库内的深度研究: 在 Fusion GraphRAG 之上,我们可以轻松实现针对私有知识库的 Deep Researcher。依托天然的知识连接与索引,再结合针对 “Chain of Exploration” Agent 的优化提示工程,以及类似 DeepSeek R1 的推理模型,就能实现类似 Deep Researcher 的 RAG 召回效果,为深度研究提供强大动力。

0.4 回到 GraphRAG 时间线

在我们的视角下,GraphRAG 正在不断演进,下面简要梳理这一发展历程:

  • GraphRAG - 2023-08 NebulaGraph 在 LlamaIndex 社区首次提出并实现了基本的 GraphRAG 模型。
  • GraphRAG with Community Summary - 2024-04 微软在论文中提出了基于社区发现算法实现全局摘要的 GraphRAG 方法,为图谱检索提供了新的思路。
  • Fusion GraphRAG
    • 融合了文档层级、章节层级、以及文中各类特殊元素(如公式、表格、图表)的多频谱混合图状索引;
    • 支持可选的知识抽取与知识增强(包括社区摘要、时序增强等),构建混合层级结构查询、全局图召回、知识频谱图谱召回以及(可选) Chain of Exploration 检索;
    • 同时支持将公共知识图谱、现有知识图谱以及图状数据中的 Chain of Exploration 召回方法进行融合。

0.5 图抽取

需要指出的是,在大模型时代,对原始知识进行类似知识图谱的抽取并非 Fusion GraphRAG 的必选项,因为并非所有 RAG 任务都需要细粒度的“知识预习”。图抽取的代价较高,目前多数公开方案成本也不低。

注:LazyGraphRAG 的相关工作同样值得关注,它提供了另一种优化思路。

附:Fusion GraphRAG 的索引及增强流程示意图

FusionGraphRAG

0.6 让图抽取成为默认策略

高质量的 GraphRAG 索引虽然效果显著,但成本昂贵。我们的终极目标是让图抽取的成本降低到与 Embedding 类似的水平,实现大规模普及。

实际上,AI 技术社区(包括我们自身)已经在朝这一方向不断努力:利用 NER 模型、更小型(例如 Phi 系列)的 LLM 进行 SFT、小规模 LLM 的 few shot 方案、尽可能的可编程方法,甚至融合 NER 与 LLM 的多种方案,都是当前探索的方向。最近,我们也看到了一些令人振奋的曙光!

本质上,我们探索的基于 LLM 的抽取方法都采用了不同形式的 CoT(思维链)策略。近期,DeepSeek-MATH 论文中首次提出的 GRPO 方法在 DeepSeek R0 中用于冷启动获取 Think-aloud(思考过程)的深度 CoT 能力表现非常出色。同时,我们注意到基于 Qwen2.5(小尺寸 0.5b 模型)进行 GRPO 训练,在需要强推理能力的任务(例如数学问题)上,通过大约 200 步训练就能获得令人满意的效果。

结合这些观察,我们开展了如下探索:

  • 数据收集与预处理: 利用 DeepSeek R1,在挑选的最新原始知识数据(晚于模型 cut-off date)的基础上进行抽取任务,并开启 <think> 模式,收集高质量的 CoT 与图抽取的 Ground Truth,筛选掉质量不合格或思考过程过长的样本,作为训练数据。
  • 奖励机制设计: 基于开源社区中针对数学问题推理训练的 GRPO 奖励函数,我们设计了奖励高质量推理过程和高质量抽取结果的奖励函数。这两项奖励均依托高质量、低成本的指令大模型进行评估打分。

在一块 A100 显卡上经过约 200 步训练后,我们初步评估结果显示:基于 Qwen2.5-3b 的小尺寸抽取模型在引入 GRPO 训练后,其性能已经超越了 GPT-4o-mini。以下是各模型的对比结果:

Criterion qwen2.5-3b qwen2.5-3b-GPRO gpt-4o-mini
Accuracy ⭐⭐☆☆☆ ⭐⭐⭐☆☆ ⭐⭐☆☆☆
Completeness ⭐⭐☆☆☆ ⭐⭐⭐⭐☆ ⭐⭐⭐☆☆
Coherence ⭐⭐☆☆☆ ⭐⭐⭐☆☆ ⭐⭐☆☆☆
Relevance ⭐⭐⭐☆☆ ⭐⭐⭐⭐☆ ⭐⭐⭐☆☆

正如当初推出首个 GraphRAG 开源实现时一样,这项工作仅仅是个起点。它证明了图抽取技术成本正在不断降低,未来我们有望大胆应用于更多场景,实现大规模的知识 GraphRAG。与此同时,这个方法也为特定领域范式数据抽取质量的提升提供了经济可行的方案。

训练数据、合成过程以及训练流程均已开源:https://github.com/wey-gu/grpo-graph-extraction 。

training

0.7 RAG 应用落地的挑战

众所周知,RAG 更多的是一种技术和功能,而非一个完整的产品。在企业级应用中,产品形态本身所面临的挑战,往往远超技术或原理上的难题。

举个例子,Excel 电子表格软件凭借其公式计算和数据可视化功能,为会计和企业数据处理领域带来了颠覆性的变革。理论上,只需学习 Excel 中的各类公式和图表功能,就能获得大量实战经验,成为专业的数据分析师。但实际上,我们观察到,大多数企业员工仅仅熟悉 Excel 的基本操作,而许多昂贵且高级的 SaaS 产品,其功能实际上也不过是 Excel 部分能力的延伸。

如果基于 RAG 能力构建的企业产品,只是一个具备强大应用编排和知识库索引能力的“无代码”平台,那么这样的平台很可能只能在企业内部的少部分用户或团队中落地。大部分宝贵的领域知识仍停留在个人或小圈子中,无法以智能高效的方式被全面激活和利用。

为了解决这一问题,我们基于 FusionGraphRAG 与 Agentic RAG 技术,打造了一个全新的高级知识库与低门槛应用平台 —— NebulaGraph AI 应用平台(内部命名为 “Catalyst”,即催化剂)。该平台的核心设计理念包括:

  • 无需构建复杂 Workflow: 用户不需要自行搭建工作流。
  • 无需编写繁琐 Prompt: 用户无需学习复杂的指令语法。
  • 简化知识定义: 用户只需将对私有知识的理解转化为对不同“知识篮子”的定义。
  • 智能对话交互: 用户通过与我们的“元智能体”交流,传达解决问题的方法。

这种设计大幅降低了使用门槛,使得企业内部的知识能够被更高效、更智能地激活与应用,真正实现“插上翅膀”的飞跃。

0.8 AI 应用平台

在这个平台中,企业用户可以将成百上千个文档轻松归入各自的知识“篮子”,并为每个篮子选择适合的“预学习”——即索引模式:

index_selection

  • 知识探索 用户可以随时在各个知识篮子中进行探索,快速定位所需信息,直观理解知识集合内的知识分布、深浅、特种。

GraphExplore

  • 智能应用生成 用户无需亲手编写复杂的 prompt,只需通过对话的方式与平台互动,即可生成各种类型的智能应用、迭代沉淀应用的 prompt。例如:

    • “帮我构建一个投研报告生成助手”

    • “我要做一个 xx 申报助手”

    • “基于我所有的周报知识,构建一个 yy 领域回答助手,我想放到钉钉签名上,免得别人来烦我关于 yy 的问题” ……

    平台内置了生成智能体应用的“元智能体”,让对话即成为开发工具。

MetaAgent

  • 问答搜索应用 平台支持构建问答型搜索应用,帮助用户快速获得精确答案。

search_app

  • 知识溯源 同时,针对问答应用,基于连接的知识溯源,追踪答案的来源,确保信息透明可靠。

search_app_1

0.9 总结

GraphRAG 的发展代表了 RAG 技术从基础检索向高级知识管理的演进。通过解决传统 RAG 在知识稀释、关联丢失和宏观问题上的挑战,GraphRAG 为企业级应用提供了更高效、灵活的解决方案。我们团队开发的 Fusion GraphRAG 融合了智能文档处理(IDP)、多层次知识索引和图状结构,将知识的内在关联映射到检索过程中,显著提升了复杂任务的处理能力。

在实践中,我们通过优化图抽取成本(R1 知识蒸馏、 GRPO 方法、 Qwen2.5-3B 规模上的工作,并开源出来。),让 GraphRAG 技术更具经济可行性,使其能够广泛应用于企业知识管理和智能应用平台。

未来,我们相信 GraphRAG 将成为高级 RAG 和智能体记忆(Agent Memory)的核心基础设施,推动深层研究和智能决策的进一步发展。NebulaGraph AI 应用平台的推出,正是这一愿景的初步实现。

最后,对于 GraphRAG,这是 Sherman 和我们 GenAI team 每一位同学共同相信的观点:

  • 狭义的 “GraphRAG”、甚至 “SubGraph RAG”,在特定场景仍然有优势,我们可以按需应用这个技术组合

  • Deep Research, Deep Search 的形态与 Graph-based RAG 方法,尤其是 Fusion GraphRAG 可以形成很好的互补

  • Graph Infra 是高级 RAG/Agent Memory 中不可绕过的结构与方法(Graph is inevitable in Advanced RAG),是以向量为基础的 RAG 的重要演进与互补

Graph RAG: 知识图谱结合 LLM 的检索增强

2023年8月15日 11:10

本文为大家揭示我们优先提出的 Graph RAG 方法,这种结合知识图谱、图数据库作为大模型结合私有知识系统最新的技术栈,作为之前的图上下文学习、text2cypher 文章的第三篇文章。

本文为大家揭示我们优先提出的 Graph RAG 方法,这种结合知识图谱、图数据库作为大模型结合私有知识系统最新的技术栈,作为之前的图上下文学习、text2cypher 文章的第三篇文章。

1 Graph RAG

第一篇关于上下文学习的博客中我们介绍过, RAG(Retrieval Argumented Generation)这种基于特定任务/问题的文档检索范式中,我们通常先收集必要的上下文,然后利用具有认知能力的机器学习模型进行上下文学习(in-context learning),来合成任务的答案。

借助 LLM 这个只需要“说话”就可以灵活处理复杂问题的感知层,只需要两步,就能搭建一个基于私有知识的智能应用:

  • 利用各种搜索方式(比如 Embedding 与向量数据库)从给定的文档中检索相关知识。
  • 利用 LLM 理解并智能地合成答案。

而这篇博客中,我们结合最新的探索进展和思考,尝试把 Graph RAG 和其他方法进行比较,说得更透一点。并且,我们决定开始用 Graph RAG 这个叫法来描述它。

实际上,Graph RAG,是最先又我和 Jerry Liu 的直播研讨会讨论相关的讨论的 Twitter Thread中提到的,差不多的内容我在 NebulaGraph 社区直播 中也用中文介绍过。

2 在 RAG 中知识图谱的价值

这部分内容我们在第一篇文章中阐述过,比如一个查询:“告诉我所有关于苹果和乔布斯的事”,基于乔布斯自传这本书进行问答,而这个问题涉及到的上下文分布在自传这本书的 30 页(分块)的时候,传统的“分割数据,Embedding 再向量搜索”方法在多个文档块里用 top-k 去搜索的方法很难得到这种分散,细粒的完整信息。而且,这种方法还很容易遗漏互相关联的文档块,从而导致信息检索不完整。

除此之外,在之后一次技术会议中,我有幸和 leadscloud.com 的徐旭讨论之后(他们因为有知识图谱的技术背景,也做了和我们类似的探索和尝试!),让我意识到知识图谱可以减少基于嵌入的语义搜索所导致的不准确性。徐旭给出的一个有趣的例子是“保温大棚”与“保温杯”,尽管在语义上两者是存在相关性的,但在大多数场景下,这种通用语义(Embedding)下的相关性常常是我们不希望产生的,进而作为错误的上下文而引入“幻觉”。

这时候,保有领域知识的知识图谱则是非常直接可以缓解、消除这种幻觉的手段。

3 用 NebulaGraph 实现 Graph RAG

一个简单的 Graph RAG 可以如下去简单实现:

  1. 使用LLM(或其他)模型从问题中提取关键实体。
  2. 根据这些实体检索子图,深入到一定的深度(例如,2)。
  3. 利用获得的上下文利用LLM产生答案。

python

# 伪代码

def _get_key_entities(query_str, llm=None ,with_llm=True):
    ...
    return _expand_synonyms(entities)

def _retrieve_subgraph_context(entities, depth=2, limit=30):
    ...
    return nebulagraph_store.get_relations(entities, depth, limit)

def _synthesize_answer(query_str, graph_rag_context, llm):
    return llm.predict(PROMPT_SYNTHESIZE_AND_REFINE, query_str, graph_rag_context)

def simple_graph_rag(query_str, nebulagraph_store, llm):
    entities = _get_key_entities(query_str, llm)
    graph_rag_context = _retrieve_subgraph_context(entities)
    return _synthesize_answer(
        query_str, graph_rag_context, llm)

然而,有了像 Llama Index 这样方便的 LLM 编排工具,开发者可以专注于 LLM 的编排逻辑和 pipeline 设计,而不用亲自处理很多细节的抽象与实现。

所以,用 Llama Index,我们可以轻松搭建 Graph RAG,甚至整合更复杂的 RAG 逻辑,比如 Graph+Vector RAG

在 Llama Index 中,我们有两种方法实现 Graph RAG:

  • KnowledgeGraphIndex 用来从任何私有数据只是从零构建知识图谱(基于 LLM 或者其他语言模型),然后 4 行代码进行 Graph RAG。

text

graph_store = NebulaGraphStore(
    space_name=space_name,
    edge_types=edge_types,
    rel_prop_names=rel_prop_names,
    tags=tags,
)
storage_context = StorageContext.from_defaults(graph_store=graph_store)

# Build KG
kg_index = KnowledgeGraphIndex.from_documents(
    documents,
    storage_context=storage_context,
    max_triplets_per_chunk=10,
    space_name=space_name,
    edge_types=edge_types,
    rel_prop_names=rel_prop_names,
    tags=tags,
)

kg_query_engine = kg_index.as_query_engine()
  • KnowledgeGraphRAGQueryEngine 则可以在任何已经存在的知识图谱上进行 Graph RAG,不过我还没有完成这个 PR

text

graph_store = NebulaGraphStore(
    space_name=space_name,
    edge_types=edge_types,
    rel_prop_names=rel_prop_names,
    tags=tags,
)
storage_context = StorageContext.from_defaults(graph_store=graph_store)

graph_rag_query_engine = KnowledgeGraphRAGQueryEngine(
    storage_context=storage_context,
)

最后,我做了一个 streamlit 的 demo来比较 Graph RAG 与 Vector RAG,从中我们可以看到 Graph RAG 并没有取代 Embedding、向量搜索的方法,而是增强了/补充了它的不足。

4 text2cypher

基于图谱的 LLM 的另一种有趣方法是text2cypher。这种方法不依赖于实体的子图检索,而是将任务/问题翻译成一个面向答案的特定图查询,和我们常说的 text2sql 方法本质是一样的。

4.1 在 NebulaGraph 上进行 text2cypher

在之前的文章中我们已经介绍过,得益于 LLM,实现 text2cypher 比传统的 ML 方法更为简单和便宜。

比如,LangChain: NebulaGraphQAChainLlama Index: KnowledgeGraphQueryEngine 让我们 3 行代码就能跑起来 text2cypher。

4.2 比较 text2cypher 和 (Sub)Graph RAG

这两种方法主要在其检索机制上有所不同。text2cypher 根据 KG 的 Schema 和给定的任务生成图形模式查询,而SubGraph RAG获取相关的子图以提供上下文。

两者都有其优点,为了大家更直观理解他们的特点,我做了这个 demo 视频:

我们可以看到两者的图查询模式在可视化下是有非常清晰的差异的。

4.3 结合text2cypher的Graph RAG

然而,两者并没有绝对的好与坏,不同场景下,它们各有优劣。

在现实世界中,我们可能并不总是知道哪种方法更有效(好帮助区分应该用哪一种),因此,我倾向于考虑同时利用两者,这样获取的两种检索结果作为上下文,一起来生成最终答案的效果可能是最好的。

具体的实现方法在这个 PR中已经可以做到了,只需要设置with_text2cypher=True,Graph RAG 就会包含text2cypher 上下文,敬请期待它的合并。

5 结论

通过将知识图谱、图存储集成到 LLM 技术栈中,Graph RAG 把 RAG 的上下文学习推向了一个新的高度。它能在 LLM 应用中,通过利用现有(或新建)的知识图谱,提取细粒度、精确调整、领域特定且互联的知识。

请继续关注图谱和LLM领域的更深入的探索和进一步的发展。

题图 prompt: A vast open book serves as the backdrop, with intricately interwoven nodes and lines forming a Graph on its pages. At the center of this graph, there’s a glowing brain symbolizing the Knowledge Graph. Rays of light emanate from the brain, reaching every corner of the graph, mirroring neural connections linking diverse information. On the right side of the illustration, a robotic arm with a pen is swiftly writing, representing the input and output of the AI large language model.

图谱驱动的大语言模型 Llama Index

2023年6月1日 14:52

如何利用图谱构建更好的 In-context Learning 大语言模型应用。

English version

注:本文是我最初以英文撰写的,然后麻烦 ChatGPT 帮我翻译成了英文,翻译的 prompt 是:

text

In this thread, you are a Chinese Tech blogger to help translate my blog in markdown from English into Chinese, the blog style is clear, fun yet professional. I will paste chapters in markdown to you and you will send back the translated and polished version.

1 LLM 应用的范式

作为认知智能的一大突破,LLM 已经改变了许多行业,以一种我们没有预料到的方式进行自动化、加速和启用。每天都会看到新的 LLN 应用被创建出来,我们仍然在探索如何利用这种魔力的新方法和用例。

将 LLM 引入流程的最典型模式之一是要求 LLM 根据专有的/特定领域的知识理解事物。目前,我们可以向 LLM 添加两种范式以获取这些知识:微调——fine-tune和上下文学习—— in-context learning。

微调是指对 LLM 模型进行附加训练,以增加额外的知识;而上下文学习是在查询提示中添加一些额外的知识。我们目前观察到,由于其简单性,上下文学习比微调更受欢迎

在本博客中,我将分享我们在上下文学习方法方面所做的工作。

2 Llama Index:数据与 LLM 之间的接口

2.1 上下文学习

上下文学习的基本思想是使用现有的 LLM(未更新)来处理特定知识数据集的特殊任务。

例如,要构建一个可以回答关于某个人的任何问题,甚至扮演一个人的数字化化身的应用程序,我们可以将上下文学习应用于一本自传书籍和 LLM。在实践中,应用程序将使用用户的问题和从书中"搜索"到的一些信息构建提示,然后查询 LLM 来获取答案。

asciiarmor

┌───────┐         ┌─────────────────┐         ┌─────────┐
│       │         │ Docs/Knowledge  │         │         │
│       │         └─────────────────┘         │         │
│ User  │─────────────────────────────────────▶   LLM   │
│       │                                     │         │
│       │                                     │         │
└───────┘                                     └─────────┘

在这种搜索方法中,实现从文档/知识(上述示例中的那本书)中获取与特定任务相关信息的最有效方式之一是利用嵌入(Embedding)。

2.2 嵌入(Embedding)

嵌入通常指的是将现实世界的事物映射到多维空间中的向量的方法。例如,我们可以将图像映射到一个(64 x 64)维度的空间中,如果映射足够好,两个图像之间的距离可以反映它们的相似性。

嵌入的另一个例子是 word2vec 算法,它将每个单词都映射到一个向量中。例如,如果嵌入足够好,我们可以对它们进行加法和减法操作,可能会得到以下结果:

  • vec(apple) + vec(pie) =~ vec("apple apie")

或者向量测量值 vec(apple) + vec(pie) - vec("apple apie") 趋近于0:

  • |vec(apple) + vec(pie) - vec("apple apie")| =~ 0

类似地,“pear” 应该比 “dinosaur” 更接近 “apple”:

  • |vec(apple) - vec(pear)| < |vec(apple) - vec(dinosaur)|

有了这个基础,理论上我们可以搜索与给定问题更相关的书籍片段。基本过程如下:

  • 将书籍分割为小片段,为每个片段创建嵌入并存储它们
  • 当有一个问题时,计算问题的嵌入
  • 通过计算距离找到与书籍片段最相似的前 K 个嵌入
  • 使用问题和书籍片段构建提示
  • 使用提示查询 LLM

asciiarmor

                  ┌────┬────┬────┬────┐                  
                  │ 1  │ 2  │ 3  │ 4  │                  
                  ├────┴────┴────┴────┤                  
                  │  Docs/Knowledge   │                  
┌───────┐         │        ...        │       ┌─────────┐
│       │         ├────┬────┬────┬────┤       │         │
│       │         │ 95 │ 96 │    │    │       │         │
│       │         └────┴────┴────┴────┘       │         │
│ User  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─▶   LLM   │
│       │                                     │         │
│       │                                     │         │
└───────┘    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  └─────────┘
    │          ┌──────────────────────────┐        ▲     
    └────────┼▶│  Tell me ....., please   │├───────┘     
               └──────────────────────────┘              
             │ ┌────┐ ┌────┐               │             
               │ 3  │ │ 96 │                             
             │ └────┘ └────┘               │             
              ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ 

2.3 Llama Index

Llama Index 是一个开源工具包,它能帮助我们以最佳实践去做 in-context learning:

  • 它提供了各种数据加载器,以统一格式序列化文档/知识,例如 PDF、维基百科页面、Notion、Twitter 等等,这样我们可以无需自行处理预处理、将数据分割为片段等操作。
  • 它还可以帮助我们创建嵌入(以及其他形式的索引),并以一行代码的方式存储嵌入(在内存中或向量数据库中)。
  • 它内置了提示和其他工程实现,因此我们无需从头开始创建和研究,例如,用4行代码在现有数据上创建一个聊天机器人

3 文档分割和嵌入的问题

嵌入和向量搜索在许多情况下效果良好,但在某些情况下仍存在挑战,其中之一是可能丢失全局上下文/跨节点上下文。

想象一下,当查询"请告诉我关于作者和 foo 的事情",在这本书中,假设编号为 1、3、6、19~25、30~44 和 96~99 的分段都涉及到 foo 这个主题。则在这种情况下,简单地搜索与书籍片段相关的前 k 个嵌入可能效果不尽人意,因为这时候只考虑与之最相关的几个片段(比如 k = 3),从而丢失了许多上下文信息。

asciiarmor

┌────┬────┬────┬────┐
│ 1  │ 2  │ 3  │ 4  │
├────┴────┴────┴────┤
│  Docs/Knowledge   │
│        ...        │
├────┬────┬────┬────┤
│ 95 │ 96 │    │    │
└────┴────┴────┴────┘

而解决、缓解这个问题的方法,在 Llama Index 工具的语境下,就是创建组合索引综合索引

其中,向量存储(VectorStore)只是其中的一部分。除此之外,我们可以定义一个摘要索引和/或树形索引等,以将不同类型的问题路由到不同的索引,从而避免在需要全局上下文时丧失它。

然而,借助知识图谱,我们可以采取更有意思的方法:

4 知识图谱

知识图谱这个术语最初由谷歌在2012年5月提出,作为其增强搜索结果和向用户提供更多上下文信息的努力的一部分。知识图谱旨在理解实体之间的关系,并直接提供查询的答案,而不仅仅返回相关网页的列表。

知识图谱是一种以图形格式组织和连接信息的方式,其中节点表示实体,边表示实体之间的关系。图形结构允许高效地存储、检索和分析数据。

它的结构如下图所示:

那么知识图谱到底能怎么帮到我们呢?

5 嵌入和知识图谱的结合

这里的基本思想是,作为信息的精炼格式,知识图谱可以以比我们对原始数据/文档进行的分割更小的粒度进行查询/搜索。因此,通过不替换大块的数据,而是将两者结合起来,我们可以更好地搜索需要全局/跨节点上下文的查询。

请看下面的图示,假设问题是关于 x 的,所有数据片段中有20个与它高度相关。现在,除了获取主要上下文的前3个文档片段(比如编号为 1、2 和 96 的文档片段),我们还从知识图谱中对 x 进行两次跳转查询,那么完整的上下文将包括:

  • 问题:“Tell me things about the author and x”
  • 来自文档片段编号 1、2 和 96 的原始文档,在 Llama Index 中,它们被称为节点 1、节点 2 和节点 96。
  • 包含 “x” 的知识图谱中的 10 个三元组,通过对 x 进行两层深度的图遍历得到:
    • x -> y(来自节点 1)
    • x -> a(来自节点 2)
    • x -> m(来自节点 4
    • x <- b-> c(来自节点 95
    • x -> d(来自节点 96)
    • n -> x(来自节点 98
    • x <- z <- i(来自节点 1 和节点 3
    • x <- z <- b(来自节点 1 和节点 95

asciiarmor

┌──────────────────┬──────────────────┬──────────────────┬──────────────────┐
│ .─.       .─.    │  .─.       .─.   │            .─.   │  .─.       .─.   │
│( x )─────▶ y )   │ ( x )─────▶ a )  │           ( j )  │ ( m )◀────( x )  │
│ `▲'       `─'    │  `─'       `─'   │            `─'   │  `─'       `─'   │
│  │     1         │        2         │        3    │    │        4         │
│ .─.              │                  │            .▼.   │                  │
│( z )─────────────┼──────────────────┼──────────▶( i )─┐│                  │
│ `◀────┐          │                  │            `─'  ││                  │
├───────┼──────────┴──────────────────┴─────────────────┼┴──────────────────┤
│       │                      Docs/Knowledge           │                   │
│       │                            ...                │                   │
│       │                                               │                   │
├───────┼──────────┬──────────────────┬─────────────────┼┬──────────────────┤
│  .─.  └──────.   │  .─.             │                 ││  .─.             │
│ ( x ◀─────( b )  │ ( x )            │                 └┼▶( n )            │
│  `─'       `─'   │  `─'             │                  │  `─'             │
│        95   │    │   │    96        │                  │   │    98        │
│            .▼.   │  .▼.             │                  │   ▼              │
│           ( c )  │ ( d )            │                  │  .─.             │
│            `─'   │  `─'             │                  │ ( x )            │
└──────────────────┴──────────────────┴──────────────────┴──`─'─────────────┘

显然,那些(可能很宝贵的)涉及到主题 x 的精炼信息来自于其他节点以及跨节点的信息,都因为我们引入知识图谱的步骤,而能够被包含在 prompt 中,用于进行上下文学习,从而克服了前边提到的问题。

6 Llama Index 中的知识图谱进展

最初,William F.H.将知识图谱的抽象概念引入了 Llama Index,其中知识图谱中的三元组与关键词相关联,并存储在内存中的文档中,随后Logan Markewich还增加了每个三元组的嵌入。

最近的几周中,我一直在与社区合作,致力于将 “GraphStore” 存储上下文引入 Llama Index,从而引入了知识图谱的外部存储。首个实现是使用我自从 2021 年以来一直在开发的开源分布式图数据库 NebulaGraph。

在实现过程中,还引入了遍历图的多个跳数选项以及在前 k 个节点中收集更多关键实体的选项(用于在知识图谱中搜索以获得更多全局上下文),我们仍在对这些变更进行完善。

引入 GraphStore 后,还可以从现有的知识图谱中进行上下文学习,并与其他索引结合使用,这也非常有前景,因为知识图谱被认为具有比其他结构化数据更高的信息密度。

在接下来的几周里,我将在本博客中更新有关 Llama Index 中的知识图谱相关工作的内容,然后在 PR 合并后,分享端到端的演示项目和教程。请继续关注!

图数据库驱动的基础设施运维示例

2023年1月13日 22:22

图技术在大型、复杂基础设施之中 SRE/DevOps 的实践参考,本文以 OpenStack 系统之上的图数据库增强的运维案例为例,揭示图数据库、图算法的智能运维方法,全流程示例代码开源。

因为有一些还未采用图技术的 DevOps/Infra 领域同学在 NebulaGraph 社区询问参考的案例,我最近试着实践了一下如何利用图的能力与优势去帮助在复杂基础设施上构建辅助运维系统,希望能帮助到感兴趣 Infra Ops 领域、同时感兴趣图数据库、图算法的大家,全过程都是可以复现、并且开源的。

复杂的基础设施运维环境通常包含非常多、不同层面的资源(manifest),为了能够尽量还原真实世界的复杂环境、又保持这个实例项目的复杂度不会失控,我选择了用一个基础设施平台:OpenStack 作为例子。

本文实现了 OpenStack 系统上分别利用 Push 和 Pull 两种模式将资源之中被图谱建模的图中点、边信息加载到 NebulaGraph 里的 Graph ETL 管道的路径。

在图谱之上,本文探索如下用例:

  • 告警、状态的推理与传导;
  • 网络直连与互联关系;
  • 镜像、云盘、快照血缘管理;
  • 高相关性虚机预警;
  • 秘钥泄漏的图上风控分析;
  • 镜像、云盘漏洞范围分析;
  • 宿主机逃离影响范围分析;
  • 脆弱依赖资源检测;

1 试验环境搭建

1.1 背景知识

OpenStack 是一个开源的云计算平台,提供了类似于 AWS 的云服务。它提供了一组可插拔的模块,包括了计算,存储和网络等功能,可以帮助用户构建和管理云环境。OpenStack采用分布式架构,支持多种操作系统和硬件平台,可以在企业级和服务提供商级环境中使用。

OpenStack-overview-diagram-new

最初由 NASARackspace Inc. 发起的 nova (虚拟化计算项目)和 swift (兼容 S3 的对象存储)项目组成,OpenStack 现在由非常多不同的子项目组成:

openstack-map-v20221001

本实验中,我们设计的 OpenStack 主要项目有:

  • Nova 是 OpenStack 的计算服务,用于管理虚拟机。
  • Cinder 是 OpenStack 的块存储服务,用于管理云存储。
  • Neutron 是 OpenStack 的网络服务,用于管理云网络。
  • Glance 是 OpenStack 的镜像服务,用于管理云镜像。
  • Horizon 是 OpenStack 的可视化控制台服务。

除此之外,我还引入了 Vitrage 项目辅助我们收集部分资源数据:

  • Vitrage 是 OpenStack 中的一个高级分析和可视化工具,用于分析和可视化 OpenStack 环境中的资源和事件。它可以汇集来自 OpenStack 各个服务的数据,并使用图形化方式展示出来。Vitrage 发现和诊断问题,提高 OpenStack 环境的可用性和可维护性。

得益于 OpenStack Decouple 的设计理念,Vitrage 可以很容易、无侵入式(只需要修改需要收集的服务两行配置)就可以在 OpenStack 的消息队列中订阅资源信息的 push 消息。

不过比较遗憾的是 Vitrage 这个项目已经有好多个 release cycle 没有什么大的更新了,应该是比较不活跃的状态了,比如在 zed 里,它的 Vitrage Dashboard 作为 Horizon 插件已经无法正常工作了,本实验只利用它的资源收集能力。

1.2 环境准备搭建

1.2.1 NebulaGraph 集群

首次快速试玩安装 NebulaGraph 的话,有这么几个选项:

  • 阿里云上的 NebulaGraph 企业版(带有企业版独有的满血版可视化工具:Explorer,可以图探索、画布内跑图算法),可以获得一个月所有资源免费使用资格:
  • Nebula-Up 一键安装 NebulaGraph 开源社区版本,需要一个带有 Docker、Docker Desktop 环境的机器
  • 有经验的同学可以参考文档进行部署:

1.2.2 OpenStack 集群

注意:如果大家已经有现成的 OpenStack 集群,这一步可以忽略,您只需要再去安装 OpenStack Vitrage 就好了。

本文需要的 OpenStack 集群是一个多机的环境,为此,我准备了在 Linux Server 上利用 Libvirt 和 Linux Bridge 搭建多个虚拟机用来模拟 OpenStack 的物理机,得益于 CPU 的嵌套虚拟化和 qemu,后边我们完全可以在虚拟机搭建的实验环境中模拟可正常工作的 OpenStack nova instance 虚机。

整个流程我都放在 https://github.com/wey-gu/openstack-graph/#environment-setup 这里了,感兴趣的同学可以访问、获取。

虚拟机搭建之后,我们还需要模拟真实的 Infra 环境,创建很多资源:整个过程也在 https://github.com/wey-gu/openstack-graph/#create-resources-on-openstack 有详细列出,想要动手操作的同学可以参考来亲自上手一下。

参考如上步骤操作之后,我们应该可以通过 Horizon Dashboard 查看集群和资源:

我们创建了几个虚拟机:nova_instance

几个网盘,其中四个挂载在不同的虚拟机上

cinder_volume

集群租户的网络拓扑:neutron_topology

我们还能通过 OpenStack Vitrage 的 API/CLI 获得部分主要资源的拓扑:

bash

source openrc admin admin
vitrage topology show --all-tenants

它的结果是一个 JSON,里边已经按照边(links)和点(nodes)序列化图结构的数据了。

json

{
  "directed": true,
  "graph": {},
  "links": [
    {
      "vitrage_is_deleted": false,
      "relationship_type": "contains",
      "source": 0,
      "target": 11,
      "key": "contains"
    },
    {
      "vitrage_is_deleted": false,
      "relationship_type": "contains",
      "source": 0,
      "target": 13,
      "key": "contains"
    },
...
    {
      "vitrage_is_deleted": false,
      "relationship_type": "attached",
      "source": 27,
      "target": 28,
      "key": "attached"
    }
  ],
  "multigraph": true,
  "nodes": [
    {
      "id": "node0",
      "vitrage_type": "nova.host",
      "vitrage_category": "RESOURCE",
      "vitrage_is_deleted": false,
      "update_timestamp": "2023-01-13T08:06:48Z",
      "vitrage_sample_timestamp": "2023-01-13T08:06:49Z",
      "vitrage_is_placeholder": false,
      "vitrage_id": "630b4c2c-5347-4073-91a3-255ec18dadfc",
      "name": "node0",
      "vitrage_cached_id": "d043d278a6a712909e30e50ca8ec2364",
      "is_real_vitrage_id": true,
      "vitrage_aggregated_state": "AVAILABLE",
      "vitrage_operational_state": "OK",
      "vitrage_datasource_name": "nova.host",
      "state": "available",
      "graph_index": 0
    },
    {
      "id": "nova",
      "vitrage_type": "nova.zone",
      "vitrage_category": "RESOURCE",
      "vitrage_is_deleted": false,
      "vitrage_sample_timestamp": "2023-01-12T03:06:48Z",
      "vitrage_is_placeholder": false,
      "vitrage_id": "a1e9c808-dac8-4b59-8f80-f21a90e9869d",
      "vitrage_cached_id": "125f1d8c4451a6385cc2cfa2b0ba45be",
      "is_real_vitrage_id": true,
      "vitrage_aggregated_state": "AVAILABLE",
      "vitrage_operational_state": "OK",
      "state": "available",
      "update_timestamp": "2023-01-12T03:06:48Z",
      "name": "nova",
      "vitrage_datasource_name": "nova.zone",
      "graph_index": 1
    },
...
  "raw": true
}

2 图谱建模

本实验环境中,我们考虑纳入如下资源进入图谱:

  • nova instance: 是Nova服务中的虚拟机实例,每个nova instance都有自己的配置信息(如CPU、内存、磁盘等),有时候我们就叫它 server 或者 VM、虚机。
  • nova host是Nova服务中的物理主机,是nova instance运行的物理环境。nova host上面会运行nova-compute服务,这个服务负责管理和调度nova instance。nova host上面还可能运行其他服务,如网络服务等。
  • nova keypair: 是Nova服务中的密钥对,用于访问nova instance
  • cinder volume: 是Cinder服务中的云存储卷,可以 attach 到nova instance上做为硬盘
  • cinder snapshot: 是Cinder服务中的云存储快照,可以在cinder volume上做快照
  • glance image: 是Glance服务中的镜像,可以作为创建nova instance时候的启动硬盘
  • neutron network: 是Neutron服务中的网络,可以用于配置nova instance的网络连接
  • neutron port: 是Neutron服务中的端口,用来连接nova instance和neutron network之间,在 nova instance 虚拟机上,一个 port 常常对应一个网卡(如果不是 trunk port 的话)。

他们之间的关系如下:

schema_draft

3 基础设施图 ETL

接下来我们解决从基础设施中抽取资源元数据的问题,

3.1 push 模式

这里的 push 指的是基础设施为主语,从资源方向我们的图谱系统主动、事件驱动地发出资源变动的信息。它的好处是资源中的实时性好,但是坏处是依赖基础设施自身,很多非常瘦的、软件定义/可编程程度不高的组件、比如某些硬件设备没有 push 机制,或者像是古老的软件系统不一定能存在 push 的接口,改造起来有侵入性。

前边提及过,OpenStack 自身是存在 Push hook 的机制的,它的子项目 vitrage 就利用这个机制很优雅地收集系统资源、告警等信息进入图中,类似的机制在其他平台中也是可以实现的。

本实验中我们就利用 vitrage 的机制去收集一部分图谱中的资源信息,如下图,可以看到 vitrage 会在 OpenStack message bus 中订阅 nova/cinder/neutron 等服务中的资源时间,把事件传入 Entity Queue,经过处理,存储到 Entity Graph 中。

在此之上,我们可以通过 vitrage API 获取图谱的拓扑,来消费它。

注意:实际上 Vitrage 服务还提供了推理告警、推理状态、定义决策事件的能力,这里我们并没有采用,后边我们在图上做的一些事情甚至还和它的能力有一些重叠。

vitrage_arch

这里我只是用它来展示 push 模式的工作机制,如果没有 Virtrage 这个项目存在,我们也可以比较容易通过 OpenStack 的 oslo.messaging 这个库很容易写出在 Message Bus(可能是 Kafka, RabbitMQ 等不同底层实现)上订阅资源时间的应用,然后把事件通过 Flink/ Kafka/ Pulsar 等方式接驳 NebulaGraph。

因为 Vitrage 的存在,我就偷懒不用去实现这部分逻辑,只消写一小部分代码调用 Vitrage API 取这个数据就可以了,讽刺的是,从这个角度来看,这其实是一种 pull 的模式了,不用拘泥它本质上算是哪一种方式,至少在资源发起测,我们把它当做 push 模式的例子看待吧。

这部分从 Vitrage 抓取的代码我放在 https://github.com/wey-gu/openstack-graph/blob/main/utils/vitrage_to_graph.py 了,调用方式很简单,在有 OpenStack 客户端的环境中,执行它就可以了,比如:

bash

# 连到 node0 上
ssh stack@node0_ip

# 进入 devstack 目录
cd devstack

# 下载 vitrage 中图数据,解析为 NeublaGraph DML/DQL 的工具
wget https://raw.githubusercontent.com/wey-gu/openstack-graph/main/utils/vitrage_to_graph.py

# 执行它
python3 vitrage_to_graph.py

执行之后,会生成如下文件:

  • schema.ngql 图数据的 Schema 定义
  • vertices/ 点数据的文件夹
  • edges/ 边数据的文件夹

3.2 pull 模式

反过来,pull 模式是从资源外部定期或者事件驱动地拉取资源,存入图谱的方式。刚好本实验中 vitrage 抓取的资源是有限的,有一些额外的资源我单独写了 python 的代码来主动全量抓取,pull 模式的好处是对资源方没有任何侵入性,只需要调用它的接口获取信息就可以了,坏处则是有的系统不太容易获得增量变化,可能只能全量去取。

这部分我抓取的关系如下:

  • glance_used_by: image -[:used_by]-> instance (get from instance)
  • glance_created_from: image -[:created_from]-> volume (get from image)
  • nova_keypair_used_by: keypair -[:used_by]-> instance (get from instance)
  • cinder_snapshot_created_from: volume snapshot -[:created_from]-> volume (get from snapshot)
  • cinder_volume_created_from: volume -[:created_from]-> volume snapshot (get from volume)
  • cinder_volume_created_from: volume -[:created_from]-> image (get from volume)

类似的,它的代码放在 https://github.com/wey-gu/openstack-graph/blob/main/utils/pull_resources_to_graph.py 之中,在真实场景下,我们可能会用 Apache Airflow、dagster 甚至是 cron job 等方式定期执行它。

我们手动执行的方式也很简单:

bash

# 连到 node0 上
ssh stack@node0_ip

# 进入 devstack 目录
cd devstack

# 下载抓取 OpenStack 资源,生成 NeublaGraph DML/DQL 的工具
wget https://raw.githubusercontent.com/wey-gu/openstack-graph/main/utils/pull_resources_to_graph.py.py

# 执行它
python3 pull_resources_to_graph.py

执行之后,会生成点、边的 ngql 语句在两个文件夹下:

  • vertices/ 点数据的文件夹
  • edges/ 边数据的文件夹

3.3 加载数据到 NebulaGraph

我们只需要在 NebulaGraph Studio Console, Explorer Console 或者 NebulaGraph 命令行 Console 中执行上边生成的 .ngql 文件就好了:

bash

# DDL from vitrage
cat schema.ngql

# DDL and DML for both push and pull mode data
cat edges/*.ngql
cat vertices/*.ngql

之后,在 NebulaGraph 中我们会有一个叫做 openstack 的图空间,用这个查询可以查到所有数据:

cypher

MATCH (n) WITH n LIMIT 1000
OPTIONAL MATCH p=(n)--()
RETURN p, n

然后渲染在 explorer 中,手动设置一下数据的图标,就可以看到我们 OpenStack 集群里的所有租户的资源图了:

all_graph_view

接下来我们终于可以在图上看看有意思的洞察了。

4 基于图谱的基础设施运维示例

作为非 SRE、DevOps 人员,我尝试藉由自己在 OpenStack 和图技术的理解想象出下边的一些实例,希望能帮助到需要的读者们。

4.1 告警、状态的推理与传导

这部分我收到了 vitrage 项目的启发,参考它们给出的实例文档:这里

借助资源图谱实时图查询、图计算甚至图可视化能力,我们可以在图上推理、传导一些信息,把重要的时间藉由图上组织好的知识分发到需要收到通知的人、组织、系统。

一个简单的例子是,比如我们在 nova host(虚拟机的宿主机、hypervisor 机器,以下简称宿主机),中获得了一个告警、事件的时候,可能是网卡失败、物理硬盘预警、CPU占用过高之类的告警。我们可以借助图谱查询获得所有相关联的虚机,然后把(WARN)级别的告警发出去或者设置它们为(亚健康)的状态。

这样,获得通知的对象,往往是一些用户的系统,就可以根据他们预先定义好的策略做一些自动化运维,或者通知的 hook:

  • 收到“宿主机 CPU 过高”的告警的情形下,可以根据用户自己设定的不同策略把虚机迁移走,或者更高级复杂的撤离方式(开始不接受新的 traffic,创建新的替代 workload,然后 gracefully 关闭这个 workload)
  • “控制面网络故障”告警情况下,这时候往往无法成功进行主机的车里、迁移,故可以考虑触发备份主机、启动新 workload、关机
  • 其他“(亚健康)状态”,可以作为负载层面出问题的根因分析(RCA)依据

下边,我们给出一个在图谱上进行告警、状态传导的查询例子,我们假设 vid 为 node0 的宿主机出现了高 CPU 的告警,则这个查询可以得到所有其上的虚机,获得时间、告警通知列表:

cypher

MATCH (vm:nova_instance)<-[:`contains`]-(host_CPU_high:nova_host)
    WHERE id(host_CPU_high) == "node0"
RETURN vm.nova_instance.name AS VM_to_raise_CPU_alarms

这其中查询的图模式是从 host_CPU_high 这个 nova_host 向外经由 contains 这个关系指向 vm 这个 nova_instance 的:

cypher

(vm:nova_instance)<-[:`contains`]-(host_CPU_high:nova_host)

它的结果是:

VM_to_raise_CPU_alarms
server-4
server-3
server-1
server-0

如果我们把查询改动一下,选择输出全路径,则可以看到这个信息传导的方向:

cypher

MATCH p=(vm:nova_instance)<-[:`contains`]-(host_CPU_high:nova_host)
    WHERE id(host_CPU_high) == "node0"
RETURN p

在 Explorer 中渲染,点击 N 跳检测:

第一个例子比较简单,甚至不是很有必要用图的能力(这种因为一跳查询表结构中也是很轻松地,我们用一两个 nova API call 就可以搞定等价的信息获取了),这里只是一个例子,实际上我们在图上可以做很多更 Graphy(具有图属性的)、复杂、独特的工作,我们慢慢来看。

4.2 网络可达检测

我们来考虑这样的场景,在 OpenStack 中,不同的主机可以连接到相同的子网(VPC),主机也可以连接到多个子网之中,这样,主机之间的网络连通性信息、与网络联通相关的推理、传导都可以在图上进行。

注:在真实世界中,这里可能还要考虑 Security Group、Router、Switch 等因素,本利中我们用到的 OpenStack 是 L2 only 的 Setup,比较简化。

获得与虚机 server_a 同一 VPC 的所有其他虚机看起来很容易表达了:

cypher

MATCH (server_a)--(:neutron_port)--(:neutron_network)--(:neutron_port)--(server_b:`nova_instance`)
    WHERE id(server_a) == "server-0"
RETURN server_b.nova_instance.name AS L2_connected_server

结果如下:

L2_connected_server
server-1

看起来很初级呀,接下来我们再查询与虚机 server_a 同一 VPC、或者有可能通过跨网络虚机而互联的主机的所有其他虚机,这时候,我们除了共享 neutron network(VPC) 的情况,还要查询所有二层直连的虚机可能通过其他 VPC 连出去的的虚机,这里,我们用到了 OPTIONAL MATCH 的表达,表示可能匹配到的模式:

cypher

MATCH (server_a)--(:neutron_port)--(net:neutron_network)--(:neutron_port)--(server_b:`nova_instance`)
    WHERE id(server_a) == "server-0"
OPTIONAL MATCH (server_b)--()--(other_net:neutron_network)--(:neutron_port)--(server_c:`nova_instance`)
    WITH server_a, server_b AS same_subnet_machines, server_c AS routeable_machines WHERE routeable_machines != server_a

RETURN same_subnet_machines.nova_instance.name AS L2_connected_server,
       routeable_machines.nova_instance.name AS cross_vpc_server

可以看到结果里,跨网络潜在的相连主机还有 server-3:

L2_connected_server cross_vpc_server
server-1 server-3

我们将其可视化,同样,修改输出为路径 pp1

cypher

MATCH p=(server_a)--(:neutron_port)--(net:neutron_network)--(:neutron_port)--(server_b:`nova_instance`)
    WHERE id(server_a) == "server-0"
OPTIONAL MATCH p1=(server_b)--()--(other_net:neutron_network)--(:neutron_port)--(server_c:`nova_instance`)
RETURN p, p1

它可能的连接路径一目了然

cross_vpc_vm

有了获得这些信息的能力,我们可以可编程地连接告警、状态、安全风控、网络等方方面面系统了,因为这不是本文的重点,这里就不加以赘述了,欢迎大家来 NebulaGraph 社区分享你们的图洞察使用方式。

接下来我们来看看存储相关的例子。

4.3 镜像、云盘、快照的血缘

在基础设施中,云盘(iSCSI、Ceph、NFS)、镜像、快照之间有多重复杂的关系,比如:

  • 一个系统镜像可能从某一个虚拟机挂载的云盘或者一个快照创建
  • 一个云盘可能是从一个系统镜像、一个快照或者另一个云盘创建
  • 一个快照是从一个云盘创建的

这种血缘信息的识别和管理是很有必要的。下边的查询可以获得给定的虚机 server-0 的所有存储血缘:

cypher

MATCH p=(server_a)-[:`attached`|created_from|used_by]-(step1)
    WHERE id(server_a) == "server-0"
OPTIONAL MATCH p1=(step1)-[:created_from*1..5]-(step2)
    RETURN p, p1

我们可以看到结果中:

  • server-0 的启动镜像(这里它是从本地盘启动的,没有挂载云盘)是从 volume-1 创建的
  • volume-1 是从 cirros-0.5.2-x86_64-disk 这个镜像创建的
  • 此外,还有其他有分叉关系的存储资源和他们也息息相关

storage_lineage_0

接下来,我们不只考虑存储资源,再看看涉及云盘(cinder_volume)挂载(attached)这层关系下的血缘关系:

cypher

MATCH p=(server_a)-[:`attached`|created_from|used_by]-(step1)
    WHERE id(server_a) == "server-4"
OPTIONAL MATCH p1=(step1)-[:created_from|attached*1..5]-(step2)
    RETURN p, p1

这次,我们可以从渲染图中读出这样的洞察:

  • server-4 的启动镜像(这里它是从本地盘启动的)是从 volume-1 创建的
    • volume-1 现在挂载在 server-6
    • volume-1 是从 cirros-0.5.2-x86_64-disk 这个镜像创建的
    • 同样 cirros-0.5.2-x86_64-disk 镜像被很多其他虚机在采用
  • server-4 同时挂载了数据盘 volume-2
    • volume-2 是一个多挂载的盘,它同时挂载在 server-3 之上
    • server-3 的系统启动盘是从快照 snapshot-202301111800-volume-1 克隆创建的
    • 快照 snapshot-202301111800-volume-1 是曾经从 volume-1 创建的
    • volume-1 现在挂载在 server-6
      • 快照不一定是从 server-6 而来,因为镜像可能被重新挂载过

storage_lineage_1

而这些血缘信息可以被用在资源生命周期管理、根因分析、安全告警、状态传导上,这里不加以赘述。

4.4 高相关性虚机预警

下面再给一个节点相似度的应用,我们可以在全图或者子图上,利用图算法找到与一个虚机在图上关系的维度上最相似的其他虚机,基于在这种相关性增加新的关系,并在关系上做风险事件预警。

这次的图算法应用中,我们按照一个典型的从[快速子图验证]到[全图生产应用的]工作流。

4.4.1 在子图上快速验证:浏览器内算法

首先,我们试着从 server-0 的三度子图上做算法的验证。

cypher

GET SUBGRAPH 3 STEPS FROM "server-0"
YIELD VERTICES AS nodes, EDGES AS relationships;

将结果渲染在画布上,我们可以看到子图中包含了其他几个虚机:

server_subgraph

然后,我们利用 explorer 中的浏览器内图算法,可以非常方便地验证我们的想法,这里,我们使用 Jaccard SImilarity 相似性算法,进行 server-0server-1,server-3,server-4,server-6 迭代分别得到相似性:

jacc_sim_browser

可以看出,在 3 步子图内,和 server-0 最近接的虚机是 server-4。进一步我们可以简单在子图上看看两者之间的路径作为相似性的解释:

sim_explain

在这个可解释结果中,我们知道 server-0server-4 相似的原因可能是:

  • 坐落在同一个宿主机:node-0
  • 使用同一个镜像:cirros_mod_from_volume-1

如此,我们最终落地的预警机制可能是,当 server-0 出现某一问题、告警时候,给相似的 server-4 也设定预警,预警理由就是它们在同样主机、同样镜像。

4.4.2 落地算法为应用:Workflow+Analytics

有了前边的快速实验,我们可以借助 workflow + NebulaGraph Analytics 把它落地为全图上的算法,利用 Analytics 分布式能力去执行。

在生产上,我们利用 Workflow 的 DAG 编排能力创建两个前后相连的任务:

  • 取临近虚机
  • 全图算相似度

第一个任务如下,它实时从给定的虚机(这里写死了 server-0,但是 workflow 可以把这里作为参数化,并封装任务为可以被 API 触发的异步服务):

cypher

MATCH (n)-[*1..5]-(m:`nova_instance`)
    WHERE id(n) == "server-0" AND n != m
RETURN distinct id(m)

这里 Query job 我们输出待比较的其他虚机的 vid。

query_sim_server

接着,JaccardSImilarity job 中,我们选择 ids1 为 server-0(这里如上,上线时是参数化的),ids2 从上游取(前边的 Query job),选择在 openstack 全图扫描所有类型的边。jacc_sim_workflow

保存、运行,我们可以看到,结果如下,区别是这次它运算了更多的目标虚机,并且迭代作用范围是全图而非一个子图,可以看到结果是一致的,这是因为子图上关联度大的点和相近的边在 Jaccard 算法里起到了更主要的作用。

jacc_result

4.5 安全相关场景

基础设施资源中的关联关系和金融、内容系统、电商领域的风控场景有相似的地方,很多场景本质上利用到了图谱关系中的知识,在图库上实时获取这些复杂多跳天然带有可解释性的安全洞察非常适合。

4.5.1 秘钥泄漏风控分析

先看一个秘钥泄漏的场景:假设 key-0 被安全部门确定被泄漏了,我们可以在毫秒时间内获得如下查询:

  • 直接采用了密钥的虚机
  • 与采用秘钥的虚机网络直连的机器
  • 与采用秘钥的虚机跨网络相连的机器

cypher

MATCH (key_leaked)-[:`used_by`]->(involved_server:nova_instance)--(:neutron_port)--(net:neutron_network)--(:neutron_port)--(server_b:nova_instance)
       WHERE id(key_leaked) == "key-0"
OPTIONAL MATCH (server_b)--()--(other_net:neutron_network)--(:neutron_port)--(server_c:nova_instance)
    WITH involved_server, server_b AS same_subnet_machines, server_c AS cross_net_machines
        WHERE cross_net_machines != involved_server
RETURN involved_server.nova_instance.name AS with_key,
        same_subnet_machines.nova_instance.name AS l2_vms,
        cross_net_machines.nova_instance.name AS cross_vpc_vms

贴一下部分结果,我们知道 server-4 采用了这个 keypair,并且 server-6 和它在同一个网络,同时,有一定可能,通过 server-6,server-1,2,0,5 也受到了威胁、影响,相关的机器可以被触发不同级别的告警来降低安全事故的影响。

with_key l2_vms cross_vpc_vms
server-4 server-6 server-1
server-4 server-6 server-2
server-4 server-6 server-0
server-4 server-6 server-5

这个查询改造为可视化结果:

cypher

MATCH p=(key_leaked)-[:`used_by`]->(involved_server:nova_instance)--(:neutron_port)--(net:neutron_network)--(:neutron_port)--(server_b:nova_instance)
    WHERE id(key_leaked) == "key-0"
OPTIONAL MATCH p1=(server_b)--()--(other_net:neutron_network)--(:neutron_port)--(server_c:nova_instance)
RETURN p,p1

在 explorer 里 应用 Dagre-LR 的布局,一关联关系很清晰的被展示出来,也许可以考虑把它引用在安全事故的报告分发给虚机租户。

key_leaked

4.5.2 镜像、云盘漏洞范围分析

类似的,一个镜像被扫出漏洞,我们可以瞬间查到涉及到的资源,并做出相应

  • 镜像文件有漏洞

cypher

MATCH p=(image_risky)-[:`created_from`]-(step1)
    WHERE id(image_risky) == "cirros-0.5.2-x86_64-disk"
OPTIONAL MATCH p1=(step1)-[:created_from*1..5]-(step2)
RETURN p, p1

image_vulnerability

  • 一个云盘有漏洞

cypher

MATCH p=(volume_risky)-[:`created_from`]-(step1)
    WHERE id(volume_risky) == "volume-1"
OPTIONAL MATCH p1=(step1)-[:created_from*1..5]-(step2)
RETURN p, p1

volume_vulnerability

4.5.3 潜在宿主机逃离影响范围分析

最后,我们讨论一个比较严重的安全问题:宿主机逃离。

在极端的情况下如果在我们得到消息,server-0 发生了有可能影响宿主机的安全时间的时候,仅仅关闭这个宿主机是不够的,受影响的范围可能已经扩大了,然而,我们不可能因为这样关闭整个机房,所以,利用图谱辅助找出受影响范围会有一些帮助。

下面的查询模式是:

  • 找出可能被影响的子网(VPC),标记最高级别风险子网为后续定位做准备
  • 找到可能被控制了的宿主机
  • 从宿主机触发,找出同主机的其他虚机
  • 从其他虚机触发,找到它们的子网(VPC)
  • 从其他虚机触发,找到可能已经被影响的网盘(防止被挂载到其他机器,扩大影响)

cypher

MATCH (server_escaping_hypervisor)<-[:`contains`]-(hypervisor_compromised:nova_host)
    WHERE id(server_escaping_hypervisor) == "server-0"
OPTIONAL MATCH (server_escaping_hypervisor)<-[:attached]-(:neutron_port)<-[:contains]-(impacted_subnet_high:neutron_network)
OPTIONAL MATCH (hypervisor_compromised)-[:`contains`]->(server_same_host:nova_instance)
OPTIONAL MATCH (server_same_host)<-[:attached]-(:neutron_port)<-[:contains]-(impacted_subnet:neutron_network)
OPTIONAL MATCH (server_same_host)<-[:attached]-(impacted_volume:cinder_volume)

RETURN impacted_subnet_high.neutron_network.name AS impacted_subnet_high,
       hypervisor_compromised.nova_host.name AS hypervisor_compromised,
       impacted_subnet.neutron_network.name AS impacted_subnet,
       [server_same_host.nova_instance.name, server_same_host.nova_instance.instance_name] AS server_same_host,
       impacted_volume.cinder_volume.name AS impacted_volume

结果中列出了 server-0 被控制之后,考虑宿主机逃离的情况下可能受影响的扩散范围。

impacted_subnet_high hypervisor_compromised impacted_subnet server_same_host impacted_volume
shared node0 shared [“server-0”, “instance-00000001”] Empty
shared node0 shared [“server-1”, “instance-00000002”] ffaeb199-47f4-4d95-89b2-97fba3c1bcfe
shared node0 private [“server-1”, “instance-00000002”] ffaeb199-47f4-4d95-89b2-97fba3c1bcfe
shared node0 private [“server-3”, “instance-00000005”] c9db7c2e-c712-49d6-8019-14b82de8542d
shared node0 private [“server-3”, “instance-00000005”] volume-2
shared node0 public [“server-4”, “instance-00000006”] volume-2

咱们再看看它的可视化结果。

cypher

MATCH p=(server_escaping_hypervisor)<-[:`contains`]-(hypervisor_compromised:nova_host)
    WHERE id(server_escaping_hypervisor) == "server-0"
OPTIONAL MATCH p0=(server_escaping_hypervisor)<-[:attached]-(:neutron_port)<-[:contains]-(impacted_subnet_high:neutron_network)
OPTIONAL MATCH p1=(hypervisor_compromised)-[:`contains`]->(server_same_host:nova_instance)
OPTIONAL MATCH p2=(server_same_host)<-[:attached]-(:neutron_port)<-[:contains]-(impacted_subnet:neutron_network)
OPTIONAL MATCH p3=(server_same_host)<-[:attached]-(impacted_volume:cinder_volume)
RETURN p,p0,p1,p2,p3

选择 Dagre 布局之后,可以比较清晰看出影响资源的范围,从这些可能受影响的虚机、网络、网盘出发,可以进一步采取需要的措施了。

hypervisor_escape

4.6 重点关注资源检测

最后,利用 Betweenness Centrality 算法,我们可以得出基础设施中影响面大的那些,”脆弱环节“,这些资源不一定真的处在危险的状态,只是说,它们处在了比较重要的资源之间的交汇处,一旦它们出问题,出问题的代价可能会非常大。

识别出这样的资源之后我们可以考虑:

  • 有针对性采用更激进、昂贵的健康检查策略;
  • 设定更高的支持、关切级别;
  • 主动迁移相关联的资源以降低”脆弱环节“对整体基础设施可用性的影响范围;

这次,我们就只在浏览器内部的子图上做算法流程验证,读者朋友们可以自己试着利用开源的 NebulaGraph Algorithm 或者付费的 NebulaGraph Workflow+Analytics 做全图上的等价操作。

首先,我们在前边用过的方式去扫描图上 1000 个点,并且从其出发,跳一跳,获得一个比较随机的子图,在我们当前的数据集下,这实际上捞取了全图的数据:

cypher

MATCH (n) WITH n LIMIT 1000
OPTIONAL MATCH p=(n)--()
RETURN p, n

在其之上,我们运行 Betweenness Centrality 之后,得到 node0 是分值最大的”脆弱一环“,的确,它是我们当前实验中负载最大的宿主机,可以想象它确实是故障之后全局影响最大的一个资源。

bwteeness_centrality

5 总结

在海量数据、企业云、混合云的复杂基础设施运维场景下,利用图数据库图算法的能力做高效的辅助运维工作是一个十分值得的尝试与技术投资。

NebulaGraph 作为高性能、开源、分布式的新一代云原生图数据库,是一个很值得考虑的图基础设施选型目标。

欢迎大家在文末留言讨论,本文的可复现环境和示例的 ETL 管道的代码、示例数据全都在 https://github.com/wey-gu/openstack-graph/ 开源,欢迎大家来一起完善。

题图版权:Ivan

连接微信群、Slack 和 GitHub:社区开放沟通的基础设施搭建

2022年12月19日 19:45

NebulaGraph 社区如何构建工具让 Slack、WeChat 中宝贵的群聊讨论同步到公共领域

1 要开放,不要封闭

在开源社区中,开放的一个重要意义是社区内的沟通、讨论应该是透明、包容并且方便所有成员访问的。这意味着社区中的任何人都应该能够参与讨论和决策过程,并且所有相关信息应该公开和自由地共享。

在公共场合进行沟通在开源理念中是重要的,正式这种方式使得社区的成员可以进行有效地共同工作,分享想法和反馈,为项目或社区做出贡献。

为了更清楚表达,我举几个反面的例子:

  • 要求贡献者使用对他们来说难以访问或难以使用的工具可能会妨碍开源社区中的开放沟通。 这可能是由于多种原因,例如:

    • 工具可能昂贵或需要许可证,而并非所有贡献者都能负担得起。
    • 工具可能难以使用或需要很高的技术经验积累,而并非所有贡献者都具备。
    • 工具可能在某些操作系统或设备上不兼容,这可能使一些贡献者难以访问它们。
  • 在不与社区其他成员分享上下文、过程或结果的情况下,只在线下(例如通过当面沟通、IM 或电话会议)进行决策可能会使很重要的知识只被少数贡献者掌握。

    这可能会阻止其他人在这些知识之上做贡献或从中学习,阻碍了开源社区所必需的开放沟通和协作。

  • 没有把系统、功能设计和提案信息以公开方式文档化、归档下来,例如只提供某一个公司内网的链接,从而可能伤害开源社区的透明度和包容性。

    因为这样的结果是社区的其他成员很难保持对社区进展的正常了解、就更不用说参与进来做贡献了。为了促进透明度和包容性,开源社区应尽量确保所有重要的信息公开和自由地共享、尽可能保有细节地被公开归档。

2 挑战

为了使社区(或工作环境)的沟通保持透明、高效和健康,其实已经存在一些共识,和通用的做法:

  • 异步优于同步,在分布式和全球协作的情况下,同步通信在大多数情况下成本高且效率低。

    因此,推荐使用 GitHub Discussion 和 StackOverflow 进行提问式的沟通。

  • 专题(Thread)讨论优于广播(Fan out),注意力是宝贵的,向所有人群发最终常常导致重要信息没有人真的读。

    因此,我们在 GitHub Discussion 和 Slack 中设有分类、频道。建立 SIG 来讨论一些有趣的主题(并归档沟通的结果),而不是将所有事情带到社区会议广泛讨论。

  • 优先选择可搜索/文本、版本控制、协作的方式与工具,并在可能的情况下鼓励成员们给其他人反馈;在基础设施上跟踪文档、设计流程,并且提供评论、review 的能力。

    为此,我们用 etherpad.opendev.org 来记录社区会议文档。

但是,就像我们还是需要同步的沟通、有 IM 和会议的需求一样,还是存在一些特例的情况,我们不能盲目追求异步、绝对的开放的,正如前边提到的,能让更多参与者公平、方便与社区连结本身也是开放的一部分,尽管使用的基础设施是可能是封闭的。事实上,几乎所有的开源社区都在用类似的方式建立他们的开源社区沟通平台:

  • Slack 在 IM 消息中中支持丰富的格式化(支持 markdown!)和 Thread 系统,其现代化设计和开放/软定义接口使我们的工作流程可能非常优美流畅。

  • 与 Slack 相比,微信在技术社区中在许多方面都很不理想(只是因为它不是专为这样场景而设计的!),但在国内,它是社区中所有人都可以访问的唯一平台。每个人都有一个微信账号,而只有很少一部分人会每天查邮件。

于是,我们面临的问题是,在 NebulaGraph 社区中有两个平台承担了沟通的重要部分,但这些信息在几个月后就会消失,它们在短时间内只能被割裂的一部分贡献者看到,而未来没有人或其他平台可以读到、搜到和参考、引用这些有价值的讨论。

3 我们摸索的方案

曾经有一段时间,我们会自己手动收集 Slack、微信群里的讨论摘要,定期分享、归档在公共领域,这个方法也确实带来了一些价值,然而,我们最后都没坚持下去,原因很简单:1. 这太费事儿了,完全不 scale;2. 这种摘要其实不好平衡能被归档信息的裁剪程度,有时候细节非常重要却不容易被摘要保留。

3.1 搞定 Slack 的信息孤岛

2022 年 10 月,我注意到了 linen.dev 这个开源项目同时也是一个 SaaS 服务,有了它,我们可以把 Discord 和 Slack 中的每个 thread 保留,它整站看起来和 Discord/ Slack 机会样,但是,它完全是可以匿名被访问、引用和被搜索引擎搜索的。

经过几个月的评估,我们最终决定了订阅 linen.dev 服务。为此,我们可以获得:

  • 不用去碰现有的 Slack,所有 Slack 的好处都能得以保留
  • 有了这样一个社区的站点 https://community-chat.nebula-graph.io/,其中,Slack 中的每个公共频道内容都能被匿名访问、能被搜索引擎收录,而访客还可以很容易知道怎么加入我们的 Slack,如图右上角:

这个站会实时同步 Slack 里的消息,重要的是,它是面向搜索引擎优化的,你可以搜搜 Kotlin 社区通过 linen 被收录的网页有多少,搜这个:“site: slack-chats.kotlinlang.org”

  • 每一个 thread 都有一个无需登录的只读 URL,我们可以方便去分享、引用它,虽然这件事儿本身就是超链接、URL的作用,但是在现在已经变得非常不容易了,比如这个新闻里提到现在新一代的人们更倾向于在抖音里搜索而不是在公共领域里。 有了它,我们可以非常开心地在 GitHub 里引用任意一个 Slack 讨论话题:

解决了 Slack 的问题之后,唯一剩下的痛点就是微信群了,每周都有挺多非常宝贵的讨论在社区群中进行却不能被保留下来,真实太令人心疼了,终于有一天,我们决定把这个问题解决。

3.2 解决微信群的信息公开化

首先,能不能直接用 Linen 一把梭,同步群消息呢?我确实在 Linen 社区和他们的 Kam 讨论直接解决 IM 同步的可能,不过到现在,他们都没有优先考虑😭。

然后,我就在想如果直接把微信同步到 Slack,Linen 不就能把微信的信息也收录了吗?

Twitter 上 求助黑客/开源社区 + 一番调研确定了没有这样的东西存在之后,我决定搞一个,做成开源项目,我花了一点时间实现了最初的版本。

石头汤来了 👉🏻:https://t.co/Fdhm9MkoBb#NebulaGraph 社区微信群现在已经会被同步到 slack 了。

— Wey Gu 古思为 (@wey_gu) December 15, 2022

万万没想到,当我做到把消息从微信同步到 Slack 之后,随之而来的问题是,通过 Slack API 发出的消息 Linen 并不会收录。

为此,我放弃了 linen 一把梭的美好愿望,转而考虑把消息同步到其他公共领域,而我第一个想到的就是 GitHub Discussions 之中,又花了周末的下午加晚上,把它做出来了:

wechat-to-gh-discussion

现在,这个机器人程序会把配置好的微信群消息同时同步到 Slack 频道和 GitHub Discussion 中给定的标签下的主题中,每一个群一个礼拜是一个主题,所有的消息都是主题下的评论。

3.3 小结

现在,我们保留了所有 Slack/微信的美好的一面的同时,把它们中的讨论消息历史全都归档、索引并公开到这两个域之下了,是不是很酷呢?

3.4 后续工作

这个同步微信的项目是 Apache 2.0 协议开源的,并且现在由Frost Ming在维护,这里还有很多待增强、实现的新功能、新任务,欢迎大家来试玩、贡献。让我们一起把开源社区的沟通做的多一点开放、少一点封闭吧~

项目地址 👉🏻 https://github.com/wey-gu/chatroom-syncer.

4 结论

有效的沟通是成功的开源社区的基石,因为它让协作、分享思想与知识、以及所有成员的参与成为可能。为了确保沟通透明、包容和有效,对于开源社区来说,让所有成员有机会参与讨论和决策以及公开自由地分享相关信息是非常重要的。

我们 NebulaGraph 社区的建设者/贡献者将继续寻找和黑客方法,以开放和良好的方式使人们连接在一起,和大家共建更好的开源、技术社区。

题图版权:Artem Beliaikin

图数据库的社交网络应用

2022年12月8日 15:30

图数据库的社交网络应用

本文是一个基于 NebulaGraph 上解决社交网络问题的常规方法综述。其中介绍的方法提供都了 Playground 供大家学习、玩耍。

社交网络大家都不陌生,无论是微信、微博、B 站还是大众点评、知乎、陌陌等服务,其本质上的用户都形成了社交网络。

在一个社交网络系统中,我们可以用图数据库来表示用户和他们的连接关系。图数据库能允许对用户之间的关系进行有效的查询,使得各种基于连接查找、统计、分析的社交网络上的业务实现变得可行、高效。

例如,图形数据库可以用来识别网络中的“有影响力用户”,或者根据用户之间的共同点对新的连接(好友关系、关心的内容)进行推荐,再或者寻找社群中相聚集的不同人群、社区,进行用户画像。图形数据库因为在能支撑复杂多跳查询的同时也能支持实时写入、更新,使其非常适合应用在用户关系不断变化的社交网络系统之上。

1 图建模

为了给出一些常见社交场景的应用示例,我会把大多数例子建立在一个典型的小型社交网络上,社交网络天然就是一张网络、图的形态。

为此,我在 NebulaGraph 官方示例数据集:篮球运动员之上,增加了三种点:

  • 地址
  • 地点
  • 文章

五种边:

  • 发文
  • 评论
  • 住在
  • 属于(地点)

它的建模非常自然:

schema_sketch

2 数据导入

2.1 加载默认数据集

首先,我们加载默认的 basketballplayer 数据集。

  • 在命令行 console 之中,我们只需要执行 :play basketballplayer 就可以。

  • 而在 NebulaGraph Studio/Explorer 之中,我们可以在欢迎页点击下载就部署这份基础数据集。

    baskertballplayer_studio_starter

2.2 加载社交网络 schema

其次我们执行下边的语句,首先是 Schema 定义的语句:

sql

CREATE TAG IF NOT EXISTS post(title string NOT NULL);
CREATE EDGE created_post(post_time timestamp);
CREATE EDGE commented_at(post_time timestamp);
CREATE TAG address(address string NOT NULL, `geo_point` geography(point));
CREATE TAG place(name string NOT NULL, `geo_point` geography(point));
CREATE EDGE belong_to();
CREATE EDGE lived_in();

2.3 加载数据

然后,在等两个心跳时间以上之后(20秒),我们可以执行数据插入:

sql

INSERT VERTEX post(title) values \
    "post1":("a beautify flower"), "post2":("my first bike"), "post3":("I can swim"), \
    "post4":("I love you, Dad"), "post5":("I hate coriander"), "post6":("my best friend, tom"), \
    "post7":("my best friend, jerry"), "post8":("Frank, the cat"), "post9":("sushi rocks"), \
    "post10":("I love you, Mom"), "post11":("Let's have a party!");

INSERT EDGE created_post(post_time) values \
    "player100"->"post1":(timestamp("2019-01-01 00:30:06")), \
    "player111"->"post2":(timestamp("2016-11-23 10:04:50")), \
    "player101"->"post3":(timestamp("2019-11-11 10:44:06")), \
    "player103"->"post4":(timestamp("2014-12-01 20:45:11")), \
    "player102"->"post5":(timestamp("2015-03-01 00:30:06")), \
    "player104"->"post6":(timestamp("2017-09-21 23:30:06")), \
    "player125"->"post7":(timestamp("2018-01-01 00:44:23")), \
    "player106"->"post8":(timestamp("2019-01-01 00:30:06")), \
    "player117"->"post9":(timestamp("2022-01-01 22:23:30")), \
    "player108"->"post10":(timestamp("2011-01-01 10:00:30")), \
    "player100"->"post11":(timestamp("2021-11-01 11:10:30"));

INSERT EDGE commented_at(post_time) values \
    "player105"->"post1":(timestamp("2019-01-02 00:30:06")), \
    "player109"->"post1":(timestamp("2016-11-24 10:04:50")), \
    "player113"->"post3":(timestamp("2019-11-13 10:44:06")), \
    "player101"->"post4":(timestamp("2014-12-04 20:45:11")), \
    "player102"->"post1":(timestamp("2015-03-03 00:30:06")), \
    "player103"->"post1":(timestamp("2017-09-23 23:30:06")), \
    "player102"->"post7":(timestamp("2018-01-04 00:44:23")), \
    "player101"->"post8":(timestamp("2019-01-04 00:30:06")), \
    "player106"->"post9":(timestamp("2022-01-02 22:23:30")), \
    "player105"->"post10":(timestamp("2011-01-11 10:00:30")), \
    "player130"->"post1":(timestamp("2019-01-02 00:30:06")), \
    "player131"->"post2":(timestamp("2016-11-24 10:04:50")), \
    "player131"->"post3":(timestamp("2019-11-13 10:44:06")), \
    "player133"->"post4":(timestamp("2014-12-04 20:45:11")), \
    "player132"->"post5":(timestamp("2015-03-03 00:30:06")), \
    "player134"->"post6":(timestamp("2017-09-23 23:30:06")), \
    "player135"->"post7":(timestamp("2018-01-04 00:44:23")), \
    "player136"->"post8":(timestamp("2019-01-04 00:30:06")), \
    "player137"->"post9":(timestamp("2022-01-02 22:23:30")), \
    "player138"->"post10":(timestamp("2011-01-11 10:00:30")), \
    "player141"->"post1":(timestamp("2019-01-03 00:30:06")), \
    "player142"->"post2":(timestamp("2016-11-25 10:04:50")), \
    "player143"->"post3":(timestamp("2019-11-14 10:44:06")), \
    "player144"->"post4":(timestamp("2014-12-05 20:45:11")), \
    "player145"->"post5":(timestamp("2015-03-04 00:30:06")), \
    "player146"->"post6":(timestamp("2017-09-24 23:30:06")), \
    "player147"->"post7":(timestamp("2018-01-05 00:44:23")), \
    "player148"->"post8":(timestamp("2019-01-05 00:30:06")), \
    "player139"->"post9":(timestamp("2022-01-03 22:23:30")), \
    "player140"->"post10":(timestamp("2011-01-12 10:01:30")), \
    "player141"->"post1":(timestamp("2019-01-04 00:34:06")), \
    "player102"->"post2":(timestamp("2016-11-26 10:06:50")), \
    "player103"->"post3":(timestamp("2019-11-15 10:45:06")), \
    "player104"->"post4":(timestamp("2014-12-06 20:47:11")), \
    "player105"->"post5":(timestamp("2015-03-05 00:32:06")), \
    "player106"->"post6":(timestamp("2017-09-25 23:31:06")), \
    "player107"->"post7":(timestamp("2018-01-06 00:46:23")), \
    "player118"->"post8":(timestamp("2019-01-06 00:35:06")), \
    "player119"->"post9":(timestamp("2022-01-04 22:26:30")), \
    "player110"->"post10":(timestamp("2011-01-15 10:00:30")), \
    "player111"->"post1":(timestamp("2019-01-06 00:30:06")), \
    "player104"->"post11":(timestamp("2022-01-15 10:00:30")), \
    "player125"->"post11":(timestamp("2022-02-15 10:00:30")), \
    "player113"->"post11":(timestamp("2022-03-15 10:00:30")), \
    "player102"->"post11":(timestamp("2022-04-15 10:00:30")), \
    "player108"->"post11":(timestamp("2022-05-15 10:00:30"));

INSERT VERTEX `address` (`address`, `geo_point`) VALUES \
    "addr_0":("Brittany Forge Apt. 718 East Eric  WV 97881", ST_Point(1,2)),\
    "addr_1":("Richard Curve Kingstad  AZ 05660", ST_Point(3,4)),\
    "addr_2":("Schmidt Key Lake Charles  AL 36174", ST_Point(13.13,-87.65)),\
    "addr_3":("5 Joanna Key Suite 704 Frankshire  OK 03035", ST_Point(5,6)),\
    "addr_4":("1 Payne Circle Mitchellfort  LA 73053", ST_Point(7,8)),\
    "addr_5":("2 Klein Mission New Annetteton  HI 05775", ST_Point(9,10)),\
    "addr_6":("1 Vanessa Stravenue Suite 184 Baileyville  NY 46381", ST_Point(11,12)),\
    "addr_7":("John Garden Port John  LA 54602", ST_Point(13,14)),\
    "addr_8":("11 Webb Groves Tiffanyside  MN 14566", ST_Point(15,16)),\
    "addr_9":("70 Robinson Locks Suite 113 East Veronica  ND 87845", ST_Point(17,18)),\
    "addr_10":("24 Mcknight Port Apt. 028 Sarahborough  MD 38195", ST_Point(19,20)),\
    "addr_11":("0337 Mason Corner Apt. 900 Toddmouth  FL 61464", ST_Point(21,22)),\
    "addr_12":("7 Davis Station Apt. 691 Pittmanfort  HI 29746", ST_Point(23,24)),\
    "addr_13":("1 Southport Street Apt. 098 Westport  KY 85907", ST_Point(120.12,30.16)),\
    "addr_14":("Weber Unions Eddieland  MT 64619", ST_Point(25,26)),\
    "addr_15":("1 Amanda Freeway Lisaland  NJ 94933", ST_Point(27,28)),\
    "addr_16":("2 Klein HI 05775", ST_Point(9,10)),\
    "addr_17":("Schmidt Key Lake Charles AL 13617", ST_Point(13.12, -87.60)),\
    "addr_18":("Rodriguez Track East Connorfort  NC 63144", ST_Point(29,30));

INSERT VERTEX `place` (`name`, `geo_point`) VALUES \
    "WV":("West Virginia", ST_Point(1,2.5)),\
    "AZ":("Arizona", ST_Point(3,4.5)),\
    "AL":("Alabama", ST_Point(13.13,-87)),\
    "OK":("Oklahoma", ST_Point(5,6.1)),\
    "LA":("Louisiana", ST_Point(7,8.1)),\
    "HI":("Hawaii", ST_Point(9,10.1)),\
    "NY":("New York", ST_Point(11,12.1)),\
    "MN":("Minnesota", ST_Point(15,16.1)),\
    "ND":("North Dakota", ST_Point(17,18.1)),\
    "FL":("Florida", ST_Point(21,22.1)),\
    "KY":("Kentucky", ST_Point(120.12,30)),\
    "MT":("Montana", ST_Point(25,26.1)),\
    "NJ":("New Jersey", ST_Point(27,28.1)),\
    "NC":("North Carolina", ST_Point(29,30.1));

INSERT EDGE `belong_to`() VALUES \
    "addr_0"->"WV":(),\
    "addr_1"->"AZ":(),\
    "addr_2"->"AL":(),\
    "addr_3"->"OK":(),\
    "addr_4"->"LA":(),\
    "addr_5"->"HI":(),\
    "addr_6"->"NY":(),\
    "addr_7"->"LA":(),\
    "addr_8"->"MN":(),\
    "addr_9"->"ND":(),\
    "addr_10"->"MD":(),\
    "addr_11"->"FL":(),\
    "addr_12"->"HI":(),\
    "addr_13"->"KY":(),\
    "addr_14"->"MT":(),\
    "addr_15"->"NJ":(),\
    "addr_16"->"HI":(),\
    "addr_17"->"AL":(),\
    "addr_18"->"NC":();

INSERT EDGE `lived_in`() VALUES \
    "player100"->"addr_4":(),\
    "player101"->"addr_7":(),\
    "player102"->"addr_2":(),\
    "player103"->"addr_3":(),\
    "player104"->"addr_0":(),\
    "player105"->"addr_5":(),\
    "player106"->"addr_6":(),\
    "player107"->"addr_1":(),\
    "player108"->"addr_8":(),\
    "player109"->"addr_9":(),\
    "player110"->"addr_10":(),\
    "player111"->"addr_11":(),\
    "player112"->"addr_12":(),\
    "player113"->"addr_13":(),\
    "player114"->"addr_14":(),\
    "player115"->"addr_15":(),\
    "player116"->"addr_16":(),\
    "player117"->"addr_17":(),\
    "player118"->"addr_18":();

2.4 数据初探

首先,我们看看数据统计

sql

[basketballplayer]> SUBMIT JOB STATS;

+------------+
| New Job Id |
+------------+
| 10         |
+------------+
[basketballplayer]> SHOW STATS;
+---------+----------------+-------+
| Type    | Name           | Count |
+---------+----------------+-------+
| "Tag"   | "address"      | 19    |
| "Tag"   | "place"        | 14    |
| "Tag"   | "player"       | 51    |
| "Tag"   | "post"         | 10    |
| "Tag"   | "team"         | 30    |
| "Edge"  | "belong_to"    | 19    |
| "Edge"  | "commented_at" | 40    |
| "Edge"  | "created_post" | 10    |
| "Edge"  | "follow"       | 81    |
| "Edge"  | "lived_in"     | 19    |
| "Edge"  | "serve"        | 152   |
| "Space" | "vertices"     | 124   |
| "Space" | "edges"        | 321   |
+---------+----------------+-------+
Got 13 rows (time spent 1038/51372 us)

查一下所有的数据

sql

MATCH ()-[e]->() RETURN e LIMIT 10000

因为数据量太小了,所以可以把所有数据在 NebulaGraph Explorer 中渲染出来:

match_all

3 找出网络中的关键人物

识别社交网络中的有影响的关键人物们(influencers)涉及使用各种指标和方法来识别在特定网络中拥有大量影响力的个人。这对很多业务场景都有帮助都很有用,比如用于营销或研究网络中的信息传播。

识别他们的方法有很多,具体的方法和考量的信息、关系、角度也取决于这些关键人物的类型、和获取他们的目的。

一些常见的方法包括看一个人拥有的粉丝或内容被消费的数量,他们在其帖子、视频上读者的参与度,以及他们的内容的影响力(转发、引用)。这些方法在图上也是可以做的,但是比较平凡,我就不举例了,在这里,我们可以试着用评估、计算节点重要性的图算法,在图上得出这些关键人物。

3.1 PageRank

PageRank 是一个非常“古老的”图算法,它通过考虑图上点之间的关系数量去迭代,得到每一个点的得分(Rank),最初由 Google 的创始人 Larry Page 和 Sergey Brin 提出并应用在早期的 Google 搜索引擎中,用来排序搜索结果,这里的 Page 可以是 Larry Page 的姓和 Web Page 的双关了。

在现代、复杂的搜索引擎中,PageRank 早就因为其过于简单而被弃用,但是在其他图结构网络场景中,PageRank 仍然在发光发热,社交网络中我们可以粗略地认为所有链接的重要程度类似,去运行这个算法找出那些关键的用户。

在 NebulaGraph 中,我们可以利用 NebulaGraph Algorithm、NebulaGraph Analytics 去在大的全图上运行 PageRank,而在日常的分析、验证、设计阶段,我们不需要在全量数据上跑结果,而在很小的子图上(最多上万),我们可以轻松地在浏览器里边运行各种图算法去得出线上业务可以用的方法。

今天,我们就用 NebulaGraph Explorer 内置的浏览器内图算法功能执行一下 PageRank 看看(具体方法这里略去,可以参考文档,不过其实就是点一下鼠标的事儿):

PageRank

我们可以从上边看到,PageRank 计算之后所有绿色的 player(人)中,“player.name: Tim Duncan” 是最大的一个点,与之相关联的关系看起来的确不少,我们在图上选择他,再右键反选,选择除了 Tim Duncan 之外的所有点,用退格键删除所有其他的点,然后在他作为起点双向探索出1到5步,可以得到 Tim Duncan 的子图:

TimDuncan

从子图中可以看到 Tim Duncan 和非常多其他球员有关注的关系的同时,一些其他很受欢迎的队员和他一起一样服役过非常热门的热刺(Spurs)队,这些都印证了 PageRank 的评估方式。

现在我们再看看其他判定维度下的算法会不会得出一样的结论呢?

3.2 Betweenness Centrality

作为另一个流行的节点重要性算法,通过计算一个节点对于图中的中介、桥梁作用来衡量节点的重要性,这里的桥梁作用是有数学定义的量化算法,这里就不展开说了,不过从感官上可以看出它是另一个角度很符合直觉地去评估重要性的方法。

我们重新在画布上查询所有的点边之后,在浏览器里运行 Betweenness Centrality 算法,这次的结果是:

Betweeness_centrality

从它的五跳内子图可以看出,与之前 PageRank 所得的关键人物 Tim Duncan 呈现的星星状态不同,Dejounte Murray 的子图呈现簇状,在感官、直觉上可以想象 Dejounte Murray 真的在很多节点之间的最小路径的必经之路上,而 Tim Duncan 则似乎和更多的重要连接者产生了关联。

DejounteMurray

在实际的应用场景中,我们通常要通过不同方式的定义的理解、不同执行结果的试验、分析去找到我们关注的关键人物产生影响的结构特征,用来针对不同需求选择不同的算法。

4 找出社区、聚集群体

社交网络中的社区检测是一种通过分析社交关系来发现社区结构的技术。社区结构是指在社交网络、图谱中相互联系密切的一组节点,这些节点通常具有相似的特征或兴趣。例如,社区结构可能表现为用户根据共同的话题或兴趣聚集在一起的一组用户。

社区检测的目的是通过对社交网络进行分析,找出不同社区的边界,并确定每个社区中的节点。这一过程可以通过使用各种算法来完成,例如标签传播算法、弱联通分量算法和 Louvain 算法等。通过发现社区结构,可以更好地了解社交网络的结构和特征,并有助于社交网络服务提供方更好地推断和预测社交网络中的行为,帮助做好社交网络的治理、广告投放、市场营销等。

由于我们的数据集是非真实的,我在不同的算法之下得出的结果并不能展现出真实的意涵,所以本章只是展示一下利用几个图算法进行社区识别之后的结果,在真实世界的案例中,我们还应该在此基础之上利用领域知识或者其他技术手段协同给出不同群体、社区的画像、标签。

标签传播算法效果:

LPA

Louvain 算法效果:

Louvain

弱联通分量算法效果:

WCC

在后边的章节,我们有机会可以在更小、更简单的子图上再次验证这几个算法,结果会更有可解释性一些。

5 好友亲密度

通过社区识别算法,其实是能够在一定程度上,在全局计算获得兴趣相近、关联紧密的好友的。那么如何获得一个给定用户的其他亲密好友呢?我们可以通过计算这个用户的好友中,和他共同好友的个数来排序获得这一信息!

我们拿 “Tim Duncan” 举例,我们知道,他的两度好友(好友的好友:(:player{name: "Tim Duncan"})-[:follow]-(f:player)-[:follow]-(fof:player))如果同时也是他的好友的话,那么他们这个中间的好友就是他和这个朋友的共同好友(Mutual Friend),那么有理由相信那些和 Tim Duncan 有更多共同好友的人可能跟他有更高亲密度:

cypher

MATCH (start:`player`{name: "Tim Duncan"})-[:`follow`]-(f:`player`)-[:`follow`]-(fof:`player`),
(start:`player`)-[:`follow`]-(fof:`player`)
RETURN fof.`player`.name, count(DISTINCT f) AS NrOfMutualF ORDER BY NrOfMutualF DESC;

这个计算结果是,“Tony Parker” 和 Tim 有 5 个共同好友,最为亲密。

fof.player.name NrOfMutualF
Tony Parker 5
Dejounte Murray 4
Manu Ginobili 3
Marco Belinelli 3
Danny Green 2
Boris Diaw 1
LaMarcus Aldridge 1
Tiago Splitter 1

下面,咱们通过可视化来验证一下这个结果吧!

先看看每一个好友的共同好友(f:)都是谁?

cypher

MATCH (start:player{name: "Tim Duncan"})-[:`follow`]-(f:player)-[:`follow`]-(fof:player),
(start:player)-[:`follow`]-(fof:player)
RETURN fof.player.name, collect(DISTINCT f.player.name);

结果如下:

fof.player.name collect(distinct f.player.name)
Boris Diaw [“Tony Parker”]
Manu Ginobili [“Dejounte Murray”, “Tiago Splitter”, “Tony Parker”]
LaMarcus Aldridge [“Tony Parker”]
Tiago Splitter [“Manu Ginobili”]
Tony Parker [“Dejounte Murray”, “Boris Diaw”, “Manu Ginobili”, “Marco Belinelli”, “LaMarcus Aldridge”]
Dejounte Murray [“Danny Green”, “Tony Parker”, “Manu Ginobili”, “Marco Belinelli”]
Danny Green [“Dejounte Murray”, “Marco Belinelli”]
Marco Belinelli [“Dejounte Murray”, “Danny Green”, “Tony Parker”]

然后我们在 Explorer 上可视化一下这个结果:

  • 首先,我们把 Tim 的量度好友路径全查出来

cypher

MATCH p=(start:player{name: "Tim Duncan"})-[:`follow`]-(f:player)-[:follow]-(fof:player)
RETURN p
  • 然后我们在其中按照度去渲染节点大小,并选中 Tim 和 Tony,并在两者之间查询 follow 类型边、双向、最多 2 跳的全部路径:

可以看出他们之间是最亲密的朋友没跑了,而且他们的共同好友也在路径之中:

["Dejounte Murray", "Boris Diaw", "Manu Ginobili", "Marco Belinelli", "LaMarcus Aldridge"]

closest_friend

5.1 朋友圈子里的小群体

这时候,如前边提到,这份数据集本身的非真实性,使得社区发现算法的结果不能得到其中洞察的内涵,现在我们可以接着这个小的子图,看看 Tim 的好友中可以如何区分群组、社区呢,咱们跑一个 Louvain 、弱联通分量、标签传播看看:

  • 弱联通分量,可以把 Tim 等朋友们大体分割出两三个相互不连通的部分,非常符合连通分量的直观理解和定义。

Tim_wcc

  • 标签传播,我们可以通过控制迭代次数按需去通过随机的传播划定出不同的划分度,结果可以有一定的区分度:

    20 次迭代

    Tim_LPA

    1000 次迭代

    Tim_LPA_1000

  • Louvain,是一个比较高效、稳定的算法,基本上在这个子图下我们可以在很小的迭代次数下得到很符合直觉的划分:

Tim_Louvain

6 新朋友推荐

接着前边二度朋友(朋友的朋友)的思路,我们可以很容易把那些还不是朋友的二度朋友作为推荐添加的好友,而排序规则则是他们之间的共同好友数量:

cypher

MATCH (start:player{name: "Tim Duncan"})-[:`follow`]-(f:player)-[:`follow`]-(fof:player)
WHERE NOT (start:player)-[:`follow`]-(fof:player) AND fof != start
RETURN fof.player.name, count(DISTINCT f) AS NrOfMutualF ORDER BY NrOfMutualF DESC;
fof.player.name NrOfMutualF
LeBron James 2
James Harden 1
Chris Paul 1
Yao Ming 1
Damian Lillard 1
JaVale McGee 1
Kevin Durant 1
Kyle Anderson 1
Rudy Gay 1
Russell Westbrook 1

显然,LeBron 最值得推荐!再看看这些共同好友都是谁?

fof.player.name collect(distinct f.player.name)
James Harden [“Dejounte Murray”]
LeBron James [“Danny Green”, “Dejounte Murray”]
Chris Paul [“Dejounte Murray”]
Yao Ming [“Shaquille O’Neal”]
Damian Lillard [“LaMarcus Aldridge”]
JaVale McGee [“Shaquille O’Neal”]
Kevin Durant [“Dejounte Murray”]
Kyle Anderson [“Dejounte Murray”]
Rudy Gay [“LaMarcus Aldridge”]
Russell Westbrook [“Dejounte Murray”]

同样,我们在刚才的子图里找找 LeBron James 吧!我们把它俩之间的两步、双向路径找出来,果然只会经过 ["Danny Green", "Dejounte Murray"] 并且,没有直接的连接:

Tim_newFriend

现在,系统会给两边发提醒:“hey,也许你们应该交个朋友!”

7 共同邻居

查找共同邻居是一个很常见的图库查询,它的场景可能根据不同的邻居关系,节点类型,同构、异构,带来不同的场景,前边两个场景下的共同好友本质上是两点之间的共同邻居,直接查询这样的关系用 OpenCypher 的表达非常简单。

7.1 两点之间的共同邻居

比如这个表达可以查询两个用户之间的共性、交集,结果可能是共同团队、去过的地方、兴趣爱好、共同参与的帖子回复等等:

cypher

MATCH p = (`v0`)--()--(`v1`)
WHERE id(`v0`) == "player100" AND id(`v1`) == "player104"
RETURN p

而限定了边的类型之后,这个查询就限定在共同好友的查询了。

cypher

MATCH p = (v0)--(:`follow`)--(v1)
WHERE id(v0) == "player100" AND id(v1) == "player104"
RETURN p

7.2 多点之间的共同邻居:内容推送

下面,我们给出一个多点共同邻居的场景,我们从一个文章触发,查出所有在这个文章上有互动的用户,找到这一群体中的共同邻居。

这个共同邻居有什么用处呢?很自然,如果这个共同邻居还没有和这个文章有任何交互,我们可以把这个文章推荐给他。

这个查询的实现很有意思:

  • 第一个 MATCH 是查到所有 post11 文章下留言和作者这些人的总人数
  • 在第二个 MATCH 之后,我们查到所有这群人的一度好友路径中,这些文章过的交互用户的一度好友的参与过文章的朋友数量刚好等于这个参与文章的用户的数量的这些人,他们其实就是这些所有参与用户的共同好友。

cypher

MATCH (blog:post)<-[e]-(:player) WHERE id(blog) == "post11"
WITH blog, count(e) AS invoved_user_count
MATCH (blog:post)<-[]-(users:player)-[:`follow`]-(common_neighbor:player)
WITH toSet(collect(users)) AS users, common_neighbor, invoved_user_count
WHERE size(users) == invoved_user_count
RETURN common_neighbor

而这个人就是…Tony!

cypher

+-----------------------------------------------------+
| common_neighbor                                     |
+-----------------------------------------------------+
| ("player101" :player{age: 36, name: "Tony Parker"}) |
+-----------------------------------------------------+

而我们可以很容易在可视化中国验证它:

cypher

MATCH p=(blog:post)<-[]-(users:player)-[:`follow`]-(common_neighbor:player)
WHERE id(blog) == "post11"
RETURN p

渲染这个查询结果,然后再这篇叫做 “Let’s have a party!” 的文章与 Tony 之间查找评论、po文、关注三类边的双向、两跳查询,就可以看到这些参与文章的人们无一例外,都是 Tony 的好友,而只有 Tony 自己还没去文章里留言!

而 Party 怎么可以少了 Tony 呢?难道是他的惊喜生日 Party,Opps,我们是不是不应该告诉他?

common_nbrs_tony

8 信息流

我在之前写过基于图技术的推荐系统实现方法,其中描述了现代推荐系统中内容过滤、排序方法可以在图谱上进行,社交网络中有一点相似但又不同的场景是信息流(Feed),它的产生类似于推荐系统中的个性化,同时有具有很高的时效性,借助于包含了内容行为知识的社交图谱可以很直观、高效去实现个性化的信息流生成。

8.1 好友参与的内容

最简单、直接的信息流定义可能就是在朋友圈、微博 feed 上刷一下关注的人创建、参与的内容列表了,先不考虑排序的问题,这些内容一定是:

  • 一定时间段内好友创建的内容
  • 一定时间端内好友评论的内容

我们可以用 cypher 表达这个查询用户 id 为 player100 的信息流:

cypher

MATCH (feed_owner:player)-[:`follow`]-(friend:player) WHERE id(feed_owner) == "player100"
OPTIONAL MATCH (friend:player)-[newly_commented:commented_at]->(:post)<-[:created_post]-(feed_owner:player)
    WHERE newly_commented.post_time > timestamp("2010-01-01 00:00:00")
OPTIONAL MATCH (friend:player)-[newly_created:created_post]->(po:post)
    WHERE newly_created.post_time > timestamp("2010-01-01 00:00:00")
WITH DISTINCT friend,
    collect(DISTINCT po.post.title) + collect("comment of " + dst(newly_commented))
        AS feeds WHERE size(feeds) > 0
RETURN friend.player.name, feeds
friend.player.name feeds
Boris Diaw [“I love you, Mom”, “comment of post11”]
Marco Belinelli [“my best friend, tom”, “comment of post11”]
Danny Green [“comment of post1”]
Tiago Splitter [“comment of post1”]
Dejounte Murray [“comment of post11”]
Tony Parker [“I can swim”]
LaMarcus Aldridge [“I hate coriander”, “comment of post11”, “comment of post1”]
Manu Ginobili [“my best friend, jerry”, “comment of post11”, “comment of post11”]

于是,我们可以把这些评论、文章发送到用户的 feed 之上了。

我们也来看看他们在图上的样子吧,我们输出所有查到的路径:

cypher

MATCH p=(feed_owner:player)-[:`follow`]-(friend:player) WHERE id(feed_owner) == "player100"
OPTIONAL MATCH p_comment=(friend:player)-[newly_commented:commented_at]->(:post)<-[:created_post]-(feed_owner:player)
    WHERE newly_commented.post_time > timestamp("2010-01-01 00:00:00")
OPTIONAL MATCH p_post=(friend:player)-[newly_created:created_post]->(po:post)
    WHERE newly_created.post_time > timestamp("2010-01-01 00:00:00")
RETURN p, p_comment, p_post

渲染在 Explorer 上,选择“神经网络”这个布局,可以很清晰看出这些粉色的文章节点,还有代表评论的边。

feed_from_friends

8.2 附近好友的内容

我们再进一步,把地理信息考虑进来,获取那些住址的经纬度小于一定距离朋友相关的内容。

这里,我们用到了 NebulaGraph 的 GeoSpatial 地理功能,ST_Distance(home.address.geo_point, friend_addr.address.geo_point) AS distance WHERE distance < 1000000 的约束条件帮我们表达了距离的限制。

cypher

MATCH (home:address)-[:lived_in]-(feed_owner:player)-[:`follow`]-(friend:player)-[:lived_in]-(friend_addr:address)
    WHERE id(feed_owner) == "player100"
WITH feed_owner, friend, ST_Distance(home.address.geo_point, friend_addr.address.geo_point) AS distance WHERE distance < 1000000

OPTIONAL MATCH (friend:player)-[newly_commented:commented_at]->(:post)<-[:created_post]-(feed_owner:player)
    WHERE newly_commented.post_time > timestamp("2010-01-01 00:00:00")
OPTIONAL MATCH (friend:player)-[newly_created:created_post]->(po:post)
    WHERE newly_created.post_time > timestamp("2010-01-01 00:00:00")
WITH DISTINCT friend,
    collect(DISTINCT po.post.title) + collect("comment of " + dst(newly_commented))
        AS feeds WHERE size(feeds) > 0
RETURN friend.player.name, feeds
friend.player.name feeds
Marco Belinelli [“my best friend, tom”, “comment of post11”]
Tony Parker [“I can swim”]
Danny Green [“comment of post1”]

这时候,从可视化这个结果也可以看到住址这一关系,以及它们的经纬度信息,我手动根据它们的经纬度,把地址的节点在图上排布了一下可以看到这个 feed 的主人 Tim(player100) 的住址(7,8)刚好在其他好友住址的中间位置,这些临近好友的相关的文章和参与评论的内容将被作为信息流推送给 Tim:

geo_feed

9 时空关系追踪

时空关系追踪这个图谱应用是在公共安全、物流、疫情防控等场景下,利用图遍历将繁杂、凌乱的信息充分利用起来的典型应用。当我们建立起这样的图谱之后往往只需要简单的图查询就可以获得非常有用的洞察。本章节我给大家距离一下这个应用场景。

9.1 数据集

为此,我创建了一个虚拟的数据集由来构建一个时空关系图谱。数据集的生成程序和一份可以直接用的文件都放在了 GitHub 上,仓库地址是: https://github.com/wey-gu/covid-track-graph-datagen

它的数据建模如下:

schema_covid

在一个全新的环境里,我们可以用下边的 3 行命令就准备好这个图谱:

bash

# 安装 NebulaGraph + NebulaGraph Studio
curl -fsSL nebula-up.siwei.io/install.sh | bash -s -- v3
# 下载数据集
git clone https://github.com/wey-gu/covid-track-graph-datagen && cd covid-track-graph-datagen
# 导入数据集
docker run --rm -ti \
    --network=nebula-net \
    -v ${PWD}/:/root \
    vesoft/nebula-importer:v3.2.0 \
    --config /root/nebula-importer-config.yaml

然后我们在 console 里查看一下数据

bash

~/.nebula-up/console.sh
# 进入 console 了,进到 covid_trace 图空间(刚才创建的)
USE covid_trace;
# 执行数据统计的任务
SHOW JOB STATS

结果:

bash

(root@nebula) [covid_trace]> SHOW STATS
+---------+------------+--------+
| Type    | Name       | Count  |
+---------+------------+--------+
| "Tag"   | "人"       | 10000  |
| "Tag"   | "地址"     | 1000   |
| "Tag"   | "城市"     | 341    |
| "Tag"   | "村镇"     | 42950  |
| "Tag"   | "省份"     | 32     |
| "Tag"   | "联系方式" | 0      |
| "Tag"   | "行政区"   | 3134   |
| "Tag"   | "街道"     | 667911 |
| "Edge"  | "住址"     | 0      |
| "Edge"  | "到访"     | 19986  |
| "Edge"  | "同住"     | 19998  |
| "Edge"  | "属于"     | 715336 |
| "Space" | "vertices" | 725368 |
| "Space" | "edges"    | 755320 |
+---------+------------+--------+
Got 14 rows (time spent 1087/46271 us)

9.2 两人之间的关联

很自然,利用路径查询就可以了:

cypher

# 最短
FIND SHORTEST PATH FROM "p_100" TO "p_101" OVER * BIDIRECT YIELD PATH AS paths

# 所有路径
FIND ALL PATH FROM "p_100" TO "p_101" OVER * BIDIRECT YIELD PATH AS paths | LIMIT 10

最短路径结果:

paths
<(“p_100”)<-[:同住@0 {}]-(“p_2136”)<-[:同住@0 {}]-(“p_3708”)-[:到访@0 {}]->(“a_125”)<-[:到访@0 {}]-(“p_101”)>

所有路径结果:

paths
<(“p_100”)<-[:同住@0 {}]-(“p_2136”)<-[:同住@0 {}]-(“p_3708”)-[:到访@0 {}]->(“a_125”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)-[:到访@0 {}]->(“a_328”)<-[:到访@0 {}]-(“p_6976”)<-[:同住@0 {}]-(“p_261”)-[:到访@0 {}]->(“a_352”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)-[:同住@0 {}]->(“p_8709”)-[:同住@0 {}]->(“p_9315”)-[:同住@0 {}]->(“p_261”)-[:到访@0 {}]->(“a_352”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)-[:到访@0 {}]->(“a_328”)<-[:到访@0 {}]-(“p_6311”)-[:同住@0 {}]->(“p_3941”)-[:到访@0 {}]->(“a_345”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)-[:到访@0 {}]->(“a_328”)<-[:到访@0 {}]-(“p_5046”)-[:同住@0 {}]->(“p_3993”)-[:到访@0 {}]->(“a_144”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)-[:同住@0 {}]->(“p_3457”)-[:到访@0 {}]->(“a_199”)<-[:到访@0 {}]-(“p_6771”)-[:到访@0 {}]->(“a_458”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)<-[:同住@0 {}]-(“p_1462”)-[:到访@0 {}]->(“a_922”)<-[:到访@0 {}]-(“p_5869”)-[:到访@0 {}]->(“a_345”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)<-[:同住@0 {}]-(“p_9489”)-[:到访@0 {}]->(“a_985”)<-[:到访@0 {}]-(“p_2733”)-[:到访@0 {}]->(“a_458”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)<-[:同住@0 {}]-(“p_9489”)-[:到访@0 {}]->(“a_905”)<-[:到访@0 {}]-(“p_2733”)-[:到访@0 {}]->(“a_458”)<-[:到访@0 {}]-(“p_101”)>
<(“p_100”)-[:到访@0 {}]->(“a_89”)<-[:到访@0 {}]-(“p_1333”)<-[:同住@0 {}]-(“p_1683”)-[:到访@0 {}]->(“a_345”)<-[:到访@0 {}]-(“p_101”)>

我们把所有路径进行可视化渲染,标记出起点终点的两人,并在其中查到他们的最短路径,他们之间的千丝万缕关系就一目了然了,无论是商业洞察、公共安全还是疫情防控的目的,有了这个信息,相应的工作都可以如虎添翼地向下进展。

find_path_two_people

当然,在真实的系统上,可能我们只需要关心两个用户之间的关联远近,得出量化的评估:

cypher

FIND SHORTEST PATH FROM "p_100" TO "p_101" OVER * BIDIRECT YIELD PATH AS paths |
    YIELD collect(length($-.paths)) AS len | YIELD coalesce($-.len[0], -1) AS len

结果中我们只关心他们之间最短路径的长度为:4。

len
4

9.3 时空相交的人

进一步我们可以用图语义勾勒出我们想确定的任何带有时间与空间信息的模式,在图谱中实时查询出来,比如对给定的人,他的 id 是 p_101,我们相差在特定时间里所有和他有时空相交的人,这意味着那些人在 p_101 访问某一地方的时间段之内也逗留、访问了这些地方:

cypher

MATCH (p:)-[`visit0`:到访]->(`addr`:地址)<-[`visit1`:到访]-(p1:)
    WHERE id(p) == "p_101" AND `visit0`.`start_time` < `visit1`.`end_time` AND `visit0`.`end_time` > `visit1`.`start_time`
    RETURN `addr`.地址.`name`, collect(p1..`name`)

我们得到了再每一个到访地点的时空相交人列表如下:

addr.地址.name collect(p1.人.name)
闵行仇路q座 255960 [“徐畅”, “王佳”, “曾亮”, “姜桂香”, “邵秀英”, “韦婷婷”, “陶玉”, “马坤”, “黄想”, “张秀芳”, “颜桂芳”, “张洋”]
丰都北京路J座 725701 [“陈春梅”, “施婷婷”, “井成”, “范文”, “王楠”, “尚明”, “薛秀珍”, “宋金凤”, “杨雪”, “邓丽华”, “李杨”, “温佳”, “叶玉”, “周明”, “王桂珍”, “段玉华”, “金成”, “黄鑫”, “邬兵”, “魏柳”, “王兰英”, “杨柳”]
普陀潜江路P座 210730 [“储平”, “洪红霞”, “沈玉英”, “王洁”, “董玉英”, “邓凤英”, “谢海燕”, “梁雷”, “张畅”, “任玉兰”, “贾宇”, “汪成”, “孙琴”, “纪红梅”, “王欣”, “陈兵”, “张成”, “王东”, “谷霞”, “林成”]
普陀武街f座 706352 [“邢成”, “张建军”, “张鑫”, “戴涛”, “蔡洋”, “汪燕”, “尹亮”, “何利”, “何玉”, “周波”, “金秀珍”, “杨波”, “张帅”, “周柳”, “马云”, “张建华”, “王丽丽”, “陈丽”, “万萍”]
城东贵阳街O座 110567 [“李洁”, “陈静”, “王建国”, “方淑华”, “古想”, “漆萍”, “詹桂花”, “王成”, “李慧”, “孙娜”, “马伟”, “谢杰”, “王鹏”, “鞠桂英”, “莫桂英”, “汪雷”, “黄彬”, “李玉梅”, “祝红梅”]

现在,我们在图上可视化这个结果看看:

cypher

MATCH (p:)-[`visit0`:到访]->(`addr`:地址)<-[`visit1`:到访]-(p1:)
    WHERE id(p) == "p_101" AND `visit0`.`start_time` < `visit1`.`end_time` AND `visit0`.`end_time` > `visit1`.`start_time`
    RETURN paths;

结果中我们标记了 p_101 为不同的图标,在用标签传播算法识别一下聚集社区,是不是一图胜千言呢?

time_and_space

9.4 最近去过的省份

最后,我们再用简单的查询模式表达出一个人在给定时间内,比如从一个时间点开始,到访过的所有省份

cypher

MATCH (p:)-[visit:到访]->(`addr`:地址)-[:属于*5]-(province:省份)
    WHERE id(p) == "p_101" AND visit.start_time > 1625469000
    RETURN province.省份.name, collect(addr.地址.name);

看起来他/她去过不少地方呢:

province.省份.name collect(addr.地址.name)
四川省 [“闵行仇路q座 255960”]
山东省 [“城东贵阳街O座 110567”]
云南省 [“丰都北京路J座 725701”]
福建省 [“普陀潜江路P座 210730”]
内蒙古自治区 [“普陀武街f座 706352”]

老轨迹,我们在图上看看这个结果吧,这次,我们选择 Dagre-LR 这个布局渲染,结果是不是非常清晰呢?

visited_provinces

10 总结

我们给出了不少社交网络里的应用案例,包括:

  • 查找关键的人
  • 识别聚集的人群、社群
  • 判定两个用户之间的亲密度
  • 推荐新朋友
  • 利用共同邻居精准推送重要内容
  • 根据好友关系、地理位置推送信息流
  • 利用时空关系图谱查询人与人之间关系、获取时空相交的人、访问过的省份

社交网络作为天然的图结构,非常适合用图的技术来存储、查询、计算、分析与可视化去解决其上的各式各样的问题,NebulaGraph 的强大处理能力和可视化能力使得我们已知很多公司在使用它作为社交领域的图存储、计算层,这其中包括:网易游戏、微信、Line、Soul、快手和知乎等等很多行业领先的团队,希望大家通过本章能对社交领域的图技术应有有一个初步的认识。

题图版权:by Ryoji

基于开源技术栈的数据血缘、治理参考解决方案

2022年11月25日 16:20

也许我们没有必要从头在 NebulaGraph 上搭建自己的数据血缘项目,本文分享如何用开源、现代的 DataOps、ETL、Dashboard、元数据、数据血缘管理系统构建大数据治理基础设施

1 元数据治理系统

元数据治理系统是一个提供了所有数据在哪、它们的格式化方式、生成、转换、依赖、呈现和所属的一站式视图

元数据治理系统是所有数据仓库、数据库、表、仪表板、ETL 作业等的目录接口(catalog),有了它,我们就不用在群里喊“大家好,我可以更改这个表的 schema 吗?”, “请问谁知道我如何找到 table-view-foo-bar 的原始数据?”,一个成熟的数据治理方案中的元数据治理系统,对成规模的数据团队来说非常必要。

对于另一个词:数据血缘则是众多需要管理的元数据之一,例如,某些 Dashboard 是 某一个 Table View 的下游,而这个 Table View 又是从另外两个上游的表 JOIN 而来两。 我们显然应该清晰的掌握、管理这些信息,去构建一个可信、可控的系统和数据质量控制体系。

2 参考解决方案

2.1 方案的动机

元数据和数据血缘本质上非常适合图数据建模、图数据库的场景。这里典型的查询就是面向图关系的查询了,像“查找每个给定组件(即表)的所有 n 深度数据血缘”就是一个 NebulaGraph 中的FIND ALL PATH 查询。

作为 NebulaGraph 社区中的一员,我发现人们在论坛、群里讨论的查询和图建模总能看出来很多人都在 NebulaGraph 上从头搭建自己的数据血缘系统,而这些工作看起来大多数都是重复造轮子(而且轮子并不容易造)。

我们来看看这样的元数据治理系统的轮子里,都需要那些功能组件:

  • 元数据 extractor
    • 这部分需要从数据栈的不同方(如数据库、数仓、Dashboard,甚至从 ETL Pipeline 和应用、服务等等)中以拉或者推的方式获取。
  • 元数据存储
    • 可以存在数据库、图数据库里,或者有时候存成超大的 JSON manifest 文件都行
  • 元数据目录接口系统(Catalog)
    • 提供 API 和/或 GUI 界面以读取/写入元数据和数据血缘的系统

在 NebulaGraph 社区中,我看到不少人因为提问的查询和建模中明显有数据血缘的痕迹,意识到大家都在从头搭建数据血缘系统。考虑到系统中元数据的提取对象都是从各种知名数据库、数仓、最终的需求也大相径庭,这种重复的开发、研究、探索是一种大大的浪费。

所以,我准备搭建一个能够启发大家的参考数据血缘、治理方案,利用到市面上最好的开源项目。希望能让打算在 NebulaGraph 上定义和迭代自己的 Graph Model 并创建内部元数据和 pipeline 的人可以从这个项目中受益,从而拥有一个相对完善、设计精美的开箱即用的元数据治理系统,和相对更完善的图模型。

我尽量把这个方案做的完备、端到端(不只有元数据管理),希望也能为考虑做基于图做数据治理的新手一些启发和参考。

下图是整个方案的简单示意图:

其中上方是元数据的来源与导入、下方是元数据的存储与展示、发现。

diagram-of-ref-project

2.2 技术栈介绍

下边介绍一下其中的每一部分。

2.2.1 数据库和数仓

为了处理和使用原始和中间数据,这里一定涉及至少一个数据库或者数仓。

它可以是 Hive、Apache Delta、TiDB、Cassandra、MySQL 或 Postgres,在这个参考项目中,我们选一个简单、流行的 Postgres。

✅ - 数据仓库:Postgres

2.2.2 数据运维 DataOps

我们应该有某种 DataOps 的方案,让 Pipeline 和环境具有可重复性、可测试性和版本控制性。

在这里,我们使用了 GitLab 创建的 Meltano

Meltano 是一个 just-work 的 DataOps 平台,它以一种神奇而优雅的方式将 Singer 作为 EL 和 dbt 作为 T 连接起来,它还连接到其他一些 dataInfra 实用程序,例如 Apache Superset 和 Apache Airflow 等。

至此,我们又纳入了一个成员:

✅ - GitOps:Meltano https://gitlab.com/meltano/meltano

2.2.3 ETL

如前边提到,我们还利用 Singer 与 Meltano 一起将来自许多不同数据源的数据 E(提取)和 L(加载)数据目标,并使用 dbt 作为 Transform 的平台。

✅ - EL:Singer https://singer.io/

✅ - T: dbt https://getdbt.com/

2.2.4 数据可视化

在数据之上创建 Dashboard、图表和表格来获得洞察是很直接的需求(可以想象为想象大数据之上的 excel 图标功能)。

Apache Superset 是我很喜欢的开源数据可视化项目,我准备用它来作为被治理管理的目标之一,同时,也会利用它的可视化作为元数据洞察功能的一部分。

✅ - Dashboard:Apache Superset https://superset.apache.org/

2.2.5 任务编排(DAG Job Orchestration)

在大多数情况下,我们的 DataOps 作业、任务会增长到需要一个编排系统的规模,我们可以用 Apache Airflow 来负责这一块。

✅ - DAG:Apache Airflow https://airflow.apache.org/

2.2.6 元数据治理

随着越来越多的组件和数据被引入数据基础设施,在数据库、表、数据建模(schema)、Dashboard、DAG(编排系统中的有向无环图)、应用与服务的所有生命周期中都将存在海量的元数据,需要对它们的管理员和团队进行协同管理、连接和发现。

Linux Foundation Amundsen 是我认为可以解决这个问题的最佳项目之一。

✅ - 数据发现:Linux Foundation Amundsen https://www.amundsen.io/amundsen/

Amundsen 用图数据库为事实源(single source of truth)以加速多跳查询,Elastic Search 为全文搜索引擎,它能对所有元数据及其血缘进行了顺滑的处理还提供了优雅的 UI 和 API。

Amundsen 支持多种图数据库为后端,这里咱们用 NebulaGraph

✅ - 全文搜索:Elastic Search

✅ - 图数据库:NebulaGraph

现在,所有组件都齐活了,开始组装它们吧。

3 环境搭建与各组件初识

整个项目方案都是开源的,大家可以在这里找到它的所有细节:

整个项目大家的实验中我遵循尽量干净、鼓励的原则,需要假设在一个 unix-like 的系统上运行,有互联网和 Docker-Compose。

注:参考 https://docs.docker.com/compose/install/ 在继续之前安装 Docker 和 Docker Compose。

这里我们在 Ubuntu 20.04 LTS X86_64 上运行它,但在其他发行版或 Linux 版本上应该也没有问题。

3.1 运行一个数仓、数据库

首先,安装 Postgres 作为我们的数仓。

这个单行命令会创建一个使用 docker 在后台运行的 Postgres,进程关闭之后容器不会残留而是被清理掉(因为参数--rm)。

bash

docker run --rm --name postgres \
    -e POSTGRES_PASSWORD=lineage_ref \
    -e POSTGRES_USER=lineage_ref \
    -e POSTGRES_DB=warehouse -d \
    -p 5432:5432 postgres

然后我们可以使用 Postgres CLI 或 GUI 客户端来验证它。

提示:可以用 VS Code 插件:SQL TOOLS 快速以 GUI 方式连接到数据库(支持 MariaDB、Postgres 、Cassandra 等)

https://marketplace.visualstudio.com/items?itemName=mtxr.sqltools

3.2 DataOps 工具链部署

然后,安装有机结合了 Singler 和 dbt 的 Meltano。

Meltano 帮助我们管理 ETL 工具(作为插件)及其所有配置和 pipeline。 这些元信息位于 meltano 配置及其系统数据库(https://docs.meltano.com/concepts/project#system-database)中,其中配置是基于文件的(可以使用 GitOps 管理),它的默认系统数据库是 SQLite。

3.2.1 安装 Meltano

使用 Meltano 的工作流是启动一个“meltano 项目”并开始将 E、L 和 T 添加到配置文件中。 项目的启动只需要一个 CLI 命令调用:meltano init yourprojectname,在那之前,可以先用 Python 的包管理器:pip 或者 Docker 镜像安装 Meltano:

  • 在 python 虚拟环境中使用 pip 安装 Meltano:

bash

mkdir .venv
# example in a debian flavor Linux distro
sudo apt-get install python3-dev python3-pip python3-venv python3-wheel -y
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano

# init a project
mkdir meltano_projects && cd meltano_projects
# replace <yourprojectname> with your own one
touch .env
meltano init <yourprojectname>
  • 或者用容器安装 Meltano:

bash

docker pull meltano/meltano:latest
docker run --rm meltano/meltano --version

# init a project
mkdir meltano_projects && cd meltano_projects

# replace <yourprojectname> with your own one
touch .env
docker run --rm -v "$(pwd)":/projects \
             -w /projects --env-file .env \
             meltano/meltano init <yourprojectname>

除了 meltano init,还有一些其他命令,例如 meltano etl 表示 ETL 的执行,还有 meltano invoke <plugin> 来调用插件命令,详细可以参考它的速查表(https://docs.meltano.com/reference/command-line-interface)。

3.2.2 Meltano GUI 界面

Meltano 还带有一个基于 Web 的 UI,执行 ui 子命令就是启动它:

bash

meltano ui

默认他会跑在 http://localhost:5000 上。

对于 Docker 运行的情况,只需要在暴露 5000 端口的情况下运行容器即可,由于容器的默认命令已经是 meltano ui,所以 run 的命令只需:

bash

docker run -v "$(pwd)":/project \
             -w /project \
             -p 5000:5000 \
             meltano/meltano

3.2.3 Meltano 项目示例

写到这里的时候,我注意到 Pat Nadolny 创建了很好的示例项目在 https://github.com/pnadolny13/meltano_example_implementations/tree/main/meltano_projects/singer_dbt_jaffle,它利用 dbt 的 Meltano 示例数据集,采用 Airflow 编排 ETL 任务(https://github.com/pnadolny13/meltano_example_implementations/tree/main/meltano_projects/dbt_orchestration,还有利用 Superset 的例子(https://github.com/pnadolny13/meltano_example_implementations/tree/main/meltano_projects/jaffle_superset)。

这里,我就不重复造轮子了,直接利用他的例子吧。

咱们可以参照 https://github.com/pnadolny13/meltano_example_implementations/tree/main/meltano_projects/singer_dbt_jaffle,运行这样的数据管道(pipeline):

  • tap-CSV(Singer),从 CSV 文件中提取数据
  • target-postgres(Singer),将数据加载到 Postgres
  • dbt,将数据转换为聚合表或视图

注意,前边我们已经启动了 postgres,那一步可以跳过。

操作过程是:

bash

git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/singer_dbt_jaffle/

meltano install
touch .env
echo PG_PASSWORD="lineage_ref" >> .env
echo PG_USERNAME="lineage_ref" >> .env

# Extract and Load(with Singer)
meltano run tap-csv target-postgres

# Trasnform(with dbt)
meltano run dbt:run

# Generate dbt docs
meltano invoke dbt docs generate

# Serve generated dbt docs
meltano invoke dbt docs to serve

# Then visit http://localhost:8080

现在,我们可以连接到 Postgres 来查看 加载和转换后的数据预览如下,截图来自 VS Code 的 SQLTool:

Payments 表里长这样子:

3.3 搭一个 BI Dashboard 系统

现在,我们有了数据仓库中的一些数据,用 ETL 工具链将不同的数据源导了进去,接下来可以试着用一下这些数据了。

像仪表大盘 Dashbaord 这样的 BI 工具能帮助我们从数据中获得有用的洞察,使用 Apache Superset,可以很容易地创建和管理基于这些数据源的 Dashboard 和各式各样的图表。

本章的重点不在于 Apache Superset 本身,所以,咱们还是复用 Pat Nadolny 在的例子 https://github.com/pnadolny13/meltano_example_implementations/tree/main/meltano_projects/jaffle_superset

3.3.1 Bootstrap Meltano 和 Superset

创建一个安装了 Meltano 的 python venv:

bash

mkdir .venv
python3 -m venv .venv/meltano
source .venv/meltano/bin/activate
python3 -m pip install wheel
python3 -m pip install meltano

参考 Pat 的 Guide(https://github.com/pnadolny13/meltano_example_implementations/tree/main/meltano_projects/jaffle_superset),稍微做一些修改:

  • 克隆 repo,进入 jaffle_superset 项目

bash

git clone https://github.com/pnadolny13/meltano_example_implementations.git
cd meltano_example_implementations/meltano_projects/jaffle_superset/
  • 修改meltano配置文件,让 Superset 连接到我们创建的 Postgres:

bash

vim meltano_projects/jaffle_superset/meltano.yml

这里,我将主机名更改为“10.1.1.111”,这是我当前主机的 IP,而如果读者在 Windows 或者 macOS 机器的 Docker Desktop 上跑的话,这里不要修改,否则要参考我去改成自己实际的地址:

diff

--- a/meltano_projects/jaffle_superset/meltano.yml
+++ b/meltano_projects/jaffle_superset/meltano.yml
@@ -71,7 +71,7 @@ plugins:
               A list of database driver dependencies can be found here https://superset.apache.org/docs/databases/installing-database-drivers
     config:
       database_name: my_postgres
-      sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@host.docker.internal:${PG_PORT}/${PG_DATABASE}
+      sqlalchemy_uri: postgresql+psycopg2://${PG_USERNAME}:${PG_PASSWORD}@10.1.1.168:${PG_PORT}/${PG_DATABASE}
       tables:
       - model.my_meltano_project.customers
       - model.my_meltano_project.orders
  • 添加 Postgres 登录的信息到 .env 文件:

bash

echo PG_USERNAME=lineage_ref >> .env
echo PG_PASSWORD=lineage_ref >> .env
  • 安装 Meltano 项目,运行 ETL 任务

bash

meltano install
meltano run tap-csv target-postgres dbt:run
  • 调用、启动 superset,这里注意 ui 不是 meltano 的内部命令,而是一个配置进去的自定义行为(user-defined action)

bash

meltano invoke superset:ui
  • 在另一个命令行终端,执行另一个自定义的命令 load_datasources

text

meltano invoke superset:load_datasources
  • 通过浏览器访问 http://localhost:8088/ 就是Superset 的图形界面了:

3.3.2 创建一个 Dashboard

试一下在这个 Meltano 项目中定义的 Postgres 中的 ETL 数据上创建一个 Dashboard 吧

  • 点击 + DASHBOARD,填写仪表盘名称,然后点击 SAVE,然后点击 + CREATE A NEW CHART

  • 在新图表(Create a new chart)视图中,我们应该选择图表类型和数据集。 在这里,我选择了 orders 表作为数据源和 Pie Chart 图表类型:

  • 点击“CREATE NEW CHART”后,我们在图表定义视图中,我选择了“status”的“Query”为“DIMENSIONS”,“COUNT(amount)”为“METRIC”。 至此,咱们就可以看到每个订单状态分布的饼图了。

  • 点击 SAVE ,它会询问应该将此图表添加到哪个 Dashboard,选择后,单击 SAVE & GO TO DASHBOARD

  • 然后,在 Dashboard 中,我们可以看到那里的所有图表。 您可以看到我还添加了另一个图表来显示客户订单数量分布:

  • ··· 的话,还能看到刷新率设置、下载渲染图等其他的功能。

目前,我们有一个简单但典型的 homelab 数据技术栈了,并且所有东西都是开源的!

想象一下,我们在 CSV 中有 100 个数据集,在数据仓库中有 200 个表,并且有几个数据工程师在运行不同的项目,这些项目使用、生成不同的应用与服务、Dashbaord 和数据库。 当有人想要查找、发现或者修改其中的一些表、数据集、Dashbaord 和管道,在沟通和工程方面可能都是非常不好管理的。

如前边提到的,我们需要这个示例项目的主要部分:元数据发现系统。

3.4 元数据发现系统

然后,我们部署一个带有 NebulaGraph 和 Elasticsearch 的 Amundsen。

注:目前【NebulaGraph 作为 Amundsen 后端的 PR】(https://github.com/amundsen-io/amundsen/pull/1817)尚未合并,我还在与 Amundsen 团队合作(https://github.com/amundsen-io/rfcs/pull/48)来实现它。

有了 Amundsen,我们可以在一个地方发现和管理整个数据栈中的所有元数据。

Amundsen 主要有两个部分组成:

它的工作原理是:利用 Data builder 从不同来源提取元数据,并将元数据持久化到 Meta service 的后端存储和 Search service 的后端存储中,用户从 Froent service 或通过 Meta Service 的API。

3.4.1 部署 Amundsen

3.4.1.1 元数据服务 Metadata service

我们用 docker-compose 文件部署一个 Amundsen 集群。 由于 NebulaGraph 后端支持尚未合并,还不能用官方的代码,先用我自己的分叉版本。

首先,让我们克隆包含所有子模块的 repo:

bash

git clone -b amundsen_nebula_graph --recursive git@github.com:wey-gu/amundsen.git
cd amundsen

然后,启动所有目录服务(catalog services)及其后端存储:

bash

docker-compose -f docker-amundsen-nebula.yml up

注:可以添加 -d 来让容器在后台运行:

bash

docker-compose -f docker-amundsen-nebula.yml up -d

关闭后台运行的集群

bash

docker-compose -f docker-amundsen-nebula.yml stop

删除后台运行的集群

bash

docker-compose -f docker-amundsen-nebula.yml down

由于这个 docker-compose 文件是供开发人员试玩、调试 Amundsen 用的,而不是给生产部署准备的,它在启动的时候会从代码库构建镜像,第一次跑的时候启动会慢一些。

部署好了之后,我们使用 Data builder 将一些示例、虚构的数据加载存储里。

3.4.1.2 抓取元数据 Data builder

Amundsen Data builder 就像 Meltano 系统一样,只不过是用在元数据的上的 ETL ,它把元数据加载到“Meta service”和“Search service”的后端存储:NebulaGraph 和 Elasticsearch 里。 这里的 Data builder 只是一个 python 模块,所有的元数据 ETL 作业可以作为脚本运行,也可以用 Apache Airflow 等 DAG 平台进行编排。

安装 Amundsen Data builder

bash

cd databuilder
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install wheel
python3 -m pip install -r requirements.txt
python3 setup.py install

调用这个示例数据构建器 ETL 脚本来把示例的虚拟数据导进去。

bash

python3 example/scripts/sample_data_loader_nebula.py
3.4.1.3 验证一下 Amundsen

在访问 Amundsen 之前,我们需要创建一个测试用户:

bash

# run a container with curl attached to amundsenfrontend
docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot

# Create a user with id test_user_id
curl -X PUT -v http://amundsenmetadata:5002/user \
    -H "Content-Type: application/json" \
    --data \
    '{"user_id":"test_user_id","first_name":"test","last_name":"user", "email":"test_user_id@mail.com"}'

exit

然后我们可以在 http://localhost:5000 查看 UI 并尝试搜索 test,它应该会返回一些结果。

然后,可以单击并浏览在“sample_data_loader_nebula.py”期间加载到 Amundsen 的那些示例元数据。

此外,我们还可以通过 NebulaStudio(http://localhost:7001) 访问 NebulaGraph 里的这些数据。

注意在 Nebula Studio 中,默认登录字段为:

  • 主机:graphd:9669
  • 用户:root
  • 密码:nebula

下图显示了有关 Amundsen 组件的更多详细信息:

asciiarmor

       ┌────────────────────────┐ ┌────────────────────────────────────────┐
        Frontend:5000            Metadata Sources                       
       ├────────────────────────┤  ┌────────┐ ┌─────────┐ ┌─────────────┐ 
        Metaservice:5001                                          
        ┌──────────────┐          Foo DB   Bar App   X Dashboard  
  ┌────┼─┤ Nebula Proxy                                           
       └──────────────┘                                          
      ├────────────────────────┤  └────────┘ └─────┬───┘ └─────────────┘ 
┌─┼────┤ Search searvice:5002                                           
     └────────────────────────┘ └──────────────────┼─────────────────────┘
     ┌─────────────────────────────────────────────┼───────────────────────┐
                                                                         
      Databuilder     ┌───────────────────────────┘                       
                                                                         
      ┌───────────────▼────────────────┐ ┌──────────────────────────────┐ 
  ┌──┼─► Extractor of Sources           ├─► nebula_search_data_extractor  
     └───────────────┬────────────────┘ └──────────────┬───────────────┘ 
     ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ 
      Loader filesystem_csv_nebula     Loader Elastic FS loader      
     └───────────────┬────────────────┘ └──────────────┬───────────────┘ 
     ┌───────────────▼────────────────┐ ┌──────────────▼───────────────┐ 
      Publisher nebula_csv_publisher   Publisher Elasticsearch       
     └───────────────┬────────────────┘ └──────────────┬───────────────┘ 
    └─────────────────┼─────────────────────────────────┼─────────────────┘
  └────────────────┐                                    
     ┌─────────────┼───►─────────────────────────┐ ┌─────▼─────┐
      Nebula Graph                                        
 └────┼─────┬───────┴───┼───────────┐     ┌─────┐             
                                      MetaD             
       ┌───▼──┐    ┌───▼──┐    ┌───▼──┐  └─────┘             
 ┌────┼─►GraphD    GraphD    GraphD                      
      └──────┘    └──────┘    └──────┘  ┌─────┐             
      :9669                             MetaD    Elastic  
      ┌────────┐ ┌────────┐ ┌────────┐  └─────┘    Search   
                                             Cluster  
      StorageD StorageD StorageD  ┌─────┐    :9200    
                                  MetaD             
      └────────┘ └────────┘ └────────┘  └─────┘             
     ├───────────────────────────────────────────┤            
 └────┤ Nebula Studio:7001                                    
      └───────────────────────────────────────────┘ └─────▲─────┘
└──────────────────────────────────────────────────────────┘

4 穿针引线:元数据发现

设置好基本环境后,让我们把所有东西穿起来。还记得我们有 ELT 一些数据到 PostgreSQL 吗?

那么,我们如何让 Amundsen 发现有关这些数据和 ETL 的元数据呢?

4.1 提取 Postgres 元数据

我们从数据源开始:首先是 Postgres。

我们为 python3 安装 Postgres 客户端:

bash

sudo apt-get install libpq-dev
pip3 install Psycopg2

4.1.1 执行 Postgres 元数据 ETL

运行一个脚本来解析 Postgres 元数据:

bash

export CREDENTIALS_POSTGRES_USER=lineage_ref
export CREDENTIALS_POSTGRES_PASSWORD=lineage_ref
export CREDENTIALS_POSTGRES_DATABASE=warehouse

python3 example/scripts/sample_postgres_loader_nebula.py

If you look into the code of the sample script for loading Postgres metadata to Nebula, the main lines are quite straightforward:

我们看看把 Postgres 元数据加载到 NebulaGraph 的示例脚本的代码,非常简单直接:

python

# part 1: PostgressMetadata --> CSV --> Nebula Graph
job = DefaultJob(
      conf=job_config,
      task=DefaultTask(
          extractor=PostgresMetadataExtractor(),
          loader=FsNebulaCSVLoader()),
      publisher=NebulaCsvPublisher())

...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

第一个工作路径是:PostgressMetadata --> CSV --> Nebula Graph

  • PostgresMetadataExtractor 用于从 Postgres 中提取/提取元数据,可以参考文档(https://www.amundsen.io/amundsen/databuilder/#postgresmetadataextractor)。
  • FsNebulaCSVLoader 用于将提取的数据中间放置为 CSV 文件
  • NebulaCsvPublisher 用于将元数据以 CSV 的形式发布到 NebulaGraph

第二个工作路径是:Metadata stored in NebulaGraph --> Elasticsearch

  • NebulaSearchDataExtractor 用于获取存储在 Nebula Graph 中的元数据
  • SearchMetadatatoElasticasearchTask 用于使 Elasticsearch 对元数据进行索引。

请注意,在生产环境中,我们可以在脚本中或使用 Apache Airflow 等编排平台触发这些作业。

4.1.2 验证 Postgres 中元数据的获取

搜索payments或者直接访问http://localhost:5000/table_detail/warehouse/postgres/public/payments,你可以看到我们 Postgres 的元数据,比如:

然后,像上面的屏幕截图一样,可以轻松完成元数据管理操作,如添加标签、所有者和描述。

4.2 提取 dbt 元数据

实际上,我们也可以从 dbt 本身提取元数据。

Amundsen DbtExtractor 会解析 catalog.jsonmanifest.json 文件以将元数据加载到 Amundsen 存储(NebulaGraph 和 Elasticsearch )。

在上面的 meltano 章节中,我们已经使用 meltano invoke dbt docs generate 生成了这个文件:

log

14:23:15  Done.
14:23:15  Building catalog
14:23:15  Catalog written to /home/ubuntu/ref-data-lineage/meltano_example_implementations/meltano_projects/singer_dbt_jaffle/.meltano/transformers/dbt/target/catalog.json

4.2.1 dbt 元数据 ETL 的执行

我们试着解析示例 dbt 文件中的元数据吧:

bash

$ ls -l example/sample_data/dbt/
total 184
-rw-rw-r-- 1 w w   5320 May 15 07:17 catalog.json
-rw-rw-r-- 1 w w 177163 May 15 07:17 manifest.json

我写的这个示例的加载例子如下:

bash

python3 example/scripts/sample_dbt_loader_nebula.py

其中主要的代码如下:

python

# part 1: Dbt manifest --> CSV --> Nebula Graph
job = DefaultJob(
      conf=job_config,
      task=DefaultTask(
          extractor=DbtExtractor(),
          loader=FsNebulaCSVLoader()),
      publisher=NebulaCsvPublisher())

...
# part 2: Metadata stored in NebulaGraph --> Elasticsearch
extractor = NebulaSearchDataExtractor()
task = SearchMetadatatoElasticasearchTask(extractor=extractor)

job = DefaultJob(conf=job_config, task=task)

它和 Postgres 元数据 ETL 的唯一区别是 extractor=DbtExtractor(),它带有以下配置以获取有关 dbt 项目的以下信息:

  • 数据库名称
  • 目录_json
  • manifest_json

python

job_config = ConfigFactory.from_dict({
  'extractor.dbt.database_name': database_name,
  'extractor.dbt.catalog_json': catalog_file_loc,  # File
  'extractor.dbt.manifest_json': json.dumps(manifest_data),  # JSON Dumped objecy
  'extractor.dbt.source_url': source_url})

4.2.2 验证 dbt 抓取结果

搜索 dbt_demo 或者直接访问 http://localhost:5000/table_detail/dbt_demo/snowflake/public/raw_inventory_value,可以看到

小提示:我们可以选择启用 DEBUG log 级别去看已发送到 Elasticsearch 和 NebulaGraph 的内容。

diff

- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(level=logging.DEBUG)

或者,在 NebulaStudio 中探索导入的数据:

首先,点击 “Start with Vertices”,填写顶点 vid:snowflake://dbt_demo.public/fact_warehouse_inventory

然后,我们可以看到顶点显示为粉红色的点。 让我们修改 Expand / ”拓展“选项:

  • 方向:双向
  • 步数:单向、三步

并双击顶点(点),它将双向拓展 3 步:

从上边这个截图里我们可以发现,在可视化之后的图数据库中,这些元数据可以很容易被查看、分析,并从中获得洞察。

小贴士,您可以点击 👁 图标选择一些要显示的属性,我在截图之前就是通过它让一些信息显示出来的。

而且,我们在 NebulaStudio 中看到的也与 Amundsen 元数据服务的数据模型相呼应:

最后,请记住我们曾利用 dbt 来转换meltano 中的一些数据,并且清单文件路径是.meltano/transformers/dbt/target/catalog.json,您可以尝试创建一个数据构建器作业来导入它。

4.3 提取 Superset 中的元数据

Amundsen 的 Superset extractor 可以获取

咱们现在就尝试摄取之前创建的 Superset Dashboard 的元数据。

4.3.1 Superset 元数据 ETL 的执行

下边执行的示例 Superset 提取脚本可以从中获取数据并将元数据加载到 NebulaGraph 和 Elasticsearch 中。

python

python3 sample_superset_data_loader_nebula.py

如果我们将日志记录级别设置为“DEBUG”,我们实际上可以看到这些中间的过程日志:

python

# fetching metadata from superset
DEBUG:urllib3.connectionpool:http://localhost:8088 "POST /api/v1/security/login HTTP/1.1" 200 280
INFO:databuilder.task.task:Running a task
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:8088
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 308 374
DEBUG:urllib3.connectionpool:http://localhost:8088 "GET /api/v1/dashboard/?q=(page_size:20,page:0,order_direction:desc) HTTP/1.1" 200 1058
...

# insert Dashboard

DEBUG:databuilder.publisher.nebula_csv_publisher:Query: INSERT VERTEX `Dashboard` (`dashboard_url`, `name`, published_tag, publisher_last_updated_epoch_ms) VALUES  "superset_dashboard://my_cluster.1/3":("http://localhost:8088/superset/dashboard/3/","my_dashboard","unique_tag",timestamp());
...

# insert a DASHBOARD_WITH_TABLE relationship/edge

INFO:databuilder.publisher.nebula_csv_publisher:Importing data in edge files: ['/tmp/amundsen/dashboard/relationships/Dashboard_Table_DASHBOARD_WITH_TABLE.csv']
DEBUG:databuilder.publisher.nebula_csv_publisher:Query:
INSERT edge `DASHBOARD_WITH_TABLE` (`END_LABEL`, `START_LABEL`, published_tag, publisher_last_updated_epoch_ms) VALUES "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/orders":("Table","Dashboard","unique_tag", timestamp()), "superset_dashboard://my_cluster.1/3"->"postgresql+psycopg2://my_cluster.warehouse/customers":("Table","Dashboard","unique_tag", timestamp());

4.3.2 验证 Superset Dashboard 元数据

通过在 Amundsen 中搜索它,我们现在可以获得 Dashboard 信息。

我们也可以从 NebulaStudio 进行验证。

注:可以参阅 Dashboard 抓取指南 中的 Amundsen Dashboard 图建模:

dashboard_graph_modeling

4.4 用 Superset 预览数据

Superset可以用来预览这样的表格数据。 相应的文档可以参考 https://www.amundsen.io/amundsen/frontend/docs/configuration/#preview-client ,其中 /superset/sql_json/ 的 API 被 Amundsen Frontend service 调用,取得预览信息。

4.5 开启数据血缘信息

默认情况下,数据血缘是关闭的,我们可以通过以下方式启用它:

  1. cd 到 Amundsen 代码仓库下,这也是我们运行 docker-compose -f docker-amundsen-nebula.yml up 命令的地方

bash

cd amundsen
  1. 修改 frontend 下的 typescript 配置

diff

--- a/frontend/amundsen_application/static/js/config/config-default.ts
+++ b/frontend/amundsen_application/static/js/config/config-default.ts
   tableLineage: {
-    inAppListEnabled: false,
-    inAppPageEnabled: false,
+    inAppListEnabled: true,
+    inAppPageEnabled: true,
     externalEnabled: false,
     iconPath: 'PATH_TO_ICON',
     isBeta: false,
  1. 重新构建 docker 镜像,其中将重建前端图像。

bash

docker-compose -f docker-amundsen-nebula.yml build

然后,重新运行 up -d 以确保前端用新的配置:

bash

docker-compose -f docker-amundsen-nebula.yml up -d

结果大概长这样子:

bash

$ docker-compose -f docker-amundsen-nebula.yml up -d
...
Recreating amundsenfrontend           ... done

之后,我们可以访问 http://localhost:5000/lineage/table/gold/hive/test_schema/test_table1 看到 Lineage (beta) 血缘按钮已经显示出来了:

我们可以点击 Downstream 在存在的时候查看该表的下游资源:

或者点血缘按钮查看血缘的图表式:

也有用于血缘查询的 API。 这个例子中我们用 cURL 调用下这个 API:

bash

docker run -it --rm --net container:amundsenfrontend nicolaka/netshoot

curl "http://amundsenmetadata:5002/table/snowflake://dbt_demo.public/raw_inventory_value/lineage?depth=3&direction=both"

上面的 API 调用是查询上游和下游方向的 linage,表 snowflake://dbt_demo.public/raw_inventory_value 的深度为 3。

结果应该是这样的:

json

{
    "depth": 3,
    "downstream_entities": [
        {
            "level": 2,
            "usage": 0,
            "key": "snowflake://dbt_demo.public/fact_daily_expenses",
            "parent": "snowflake://dbt_demo.public/fact_warehouse_inventory",
            "badges": [],
            "source": "snowflake"
        },
        {
            "level": 1,
            "usage": 0,
            "key": "snowflake://dbt_demo.public/fact_warehouse_inventory",
            "parent": "snowflake://dbt_demo.public/raw_inventory_value",
            "badges": [],
            "source": "snowflake"
        }
    ],
    "key": "snowflake://dbt_demo.public/raw_inventory_value",
    "direction": "both",
    "upstream_entities": []
}

实际上,这个血缘数据就是在我们的 DbtExtractor 执行期间提取和加载的,其中 extractor .dbt.{DbtExtractor.EXTRACT_LINEAGE} 默认为 True,因此创建了血缘元数据并将其加载到了 Amundsen。

4.5.1 在 NebulaGraph 中洞察血缘

使用图数据库作为元数据存储的两个优点是:

  • 图查询本身是一个灵活的 DSL for lineage API,例如,这个查询帮助我们执行 Amundsen 元数据 API 的等价的查询:

cypher

MATCH p=(t:`Table`) -[:`HAS_UPSTREAM`|:`HAS_DOWNSTREAM` *1..3]->(x)
WHERE id(t) == "snowflake://dbt_demo.public/raw_inventory_value" RETURN p
  • 我们现在甚至可以在 NebulaGraph Studio 或者 Explorer 的控制台中查询它

​ 然后渲染这个结果:

4.5.2 提取数据血缘

这些血缘信息是需要我们明确指定、获取的,获取的方式可以是自己写的 extractor,也可以是一些已经有的方式。比如 dbt 的 extractor和 Open Lineage 项目的 Amundsen extractor。

4.5.2.1 通过 dbt

这个在刚才已经展示过了,Dbt 的 Extractor 会从表级别获取血缘和其他 dbt 中产生的元数据信息一起被拿到。

4.5.2.2 通过 Open Lineage

Amundsen 中的另一个开箱即用的血缘 Extractor 是 OpenLineageTableLineageExtractor

Open Lineage 是一个开放的框架,可以将不同来源的血统数据收集到一个地方,它可以将血统信息输出为 JSON 文件:https://www.amundsen.io/amundsen/databuilder/#openlineagetablelineageextractor

下边是它的 Amundsen data builder 例子:

python

dict_config = {
    # ...
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.CLUSTER_NAME}': 'datalab',
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.OL_DATASET_NAMESPACE_OVERRIDE}': 'hive_table',
    f'extractor.openlineage_tablelineage.{OpenLineageTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION}': 'input_dir/openlineage_nd.json',
}
...

task = DefaultTask(
    extractor=OpenLineageTableLineageExtractor(),
    loader=FsNebulaCSVLoader())

5 回顾

整套元数据治理/发现的方案思路如下:

  • 将整个数据技术栈中的组件作为元数据源(从任何数据库、数仓,到 dbt、Airflow、Openlineage、Superset 等各级项目)
  • 使用 Databuilder(作为脚本或 DAG)运行元数据 ETL,以使用 NebulaGraph 和 Elasticsearch 存储和索引
  • 从前端 UI(使用 Superset 预览)或 API 去使用、消费、管理和发现元数据
  • 通过查询和 UI 对 NebulaGraph,我们可以获得更多的可能性、灵活性和数据、血缘的洞察

5.1 涉及到的开源

此参考项目中使用的所有项目都按字典顺序在下面列出。

  • Amundsen
  • Apache Airflow
  • Apache Superset
  • dbt
  • Elasticsearch
  • meltano
  • Nebula Graph
  • Open Lineage
  • singer

题图版权: Phil Hearing

NebulaGraph 的云原生 API 网关最佳实践

2022年11月16日 17:24

本文介绍了利用开源 API 网关 APISIX 加速 NebulaGraph 多个场景的落地最佳实践:负载均衡、暴露接口结构与 TLS Termination。

1 API 网关介绍

1.1 什么是 API 网关

API 网关是它位于客户端和服务器之间的“中间人”,用于管理、监控和保护 API。它可以在 API 之前执行一些操作,例如身份验证、授权、缓存、日志记录、审计、流量控制、安全、防火墙、压缩、解压缩、加密、解密等。

API 网关可以工作在 4 层和 7 层。跑在 7 层的API 网关可以使用多种协议,例如 HTTP、HTTPS、WebSocket、gRPC、MQTT 等,在这些应用层协议中做一些操作,例如请求重写、请求转发、请求合并、请求重试、请求缓存、请求限流、请求熔断、请求降级、请求鉴权、请求监控、请求日志、请求审计、请求转发等等。

这里举例一下借助 API 网关可以做的具体的事儿吧:

  • 我们可以在网关层增加认证层,比如 JWT 认证、OAuth2 认证、OpenID 认证等等,这样则不需要在每个服务中都做具体的认证集成工作,这可以节省非常多的开发成本。
  • 我们可以借助网关给跳板机 SSH 流量增加无需客户端修改的复杂认证,比如跳转任何客户端的 SSH 登录,给出一个网址或者输入框,引导登陆者通过网页的 SSO 认证(包含多因素认证),然后再通过网关转发到 SSH 服务。
  • 我们甚至可以在网关层做 Serverless 数据库!TiDB 社区的同学们就在做这个事儿,他们从普通的 MySQL 客户端的登录请求中解析能推断出转到需要的 TiDB 示例的信息,并且在需要 cold start 唤醒实例的时候把连接保持住,可以参考这篇文章:TiDB Gateway
  • 如果我们特别惨在维护一些屎山项目,不得不针对旧版本的应用程序对新版本的服务端进行兼容,这时候 API 网关也可以通过一些请求重写,把旧版本的请求转换成新版本的请求。

只要脑洞大,理论上API 的网关可以做很多很多事儿,显然,不是所有的事情都是适合在这一层面去做的,通常那些比较通用的事情才适合在这一层面去做,这里我只是给出一些典型和极端的具体例子。

1.2 Apache APISIX

API 网关是从 LB、Reverse Proxy 项目演进过来的,随着云原生的兴起,API 网关也逐渐成为了云原生的一部分,流行的开源的网关就有很多:

而且其中很多都是基于 Nginx/OpenResty 的下游项目,这里就以 Apache APISIX 为例,介绍一下 NebulaGraph 借助 API 网关的几个实践。

2 NebulaGraph 介绍

NebulaGraph 是一个开源的分布式图数据库,它的特点是:

  • 高性能:NebulaGraph 的性能可以达到每秒百万级的读写,具有极高的扩展性,在千亿点万亿边的规模上支持毫秒级的查询。
  • 易扩展:NebulaGraph 的架构是分布式的,可以在多台机器上扩展,每台机器上可以运行多个服务进程,它的查询层是无状态的计算存储分离架构,我们可以很容易引入不同配置、不通类型的计算层,实现同一集群上 TP、AP、 图计算等不同负载的混合查询。
  • 易使用:NebulaGraph 的原生查询语言是类 SQL 的,易于学习和使用,同时支持 OpenCypher。
  • 丰富生态:NebulaGraph 的生态系统正在不断壮大,目前已经有了多个客户端,包括 Java、Python、Go、C++、JavaScript、Spark、Flink 等,同时也有了多个可视化工具,包括 NebulaGraph Studio、Nebula Dashboard、Nebula Explorer 等。

3 本文讨论的问题

本文给出了基于 NebulaGraph 集群应用中涉及到 API 网关的几个场景。

  • 查询接口的负载均衡
  • 底层存储接口的暴露
  • 传输层的加密

3.1 查询接口负载均衡

首先是图数据库查询接口(GraphD)的负载均衡与高可用的问题。

NebulaGraph 内核由三种服务组成:GraphD、MetaD 和 StorageD:

https://docs-cdn.nebula-graph.com.cn/figures/nebula-graph-architecture_3.png

所以,在默认情况下,集群只会暴露 GraphD 的接口,提供给客户端连接,执行 nGQL 的查询。

这其中,GraphD 是无状态的,这意味着我们可以在多个 GraphD 之间做负载均衡。这里,我们有两种方法:基于客户端的(Client-Side LB),与基于代理的。

3.1.1 客户端的负载均衡

客户端的负载均衡,就是在客户端,也就是应用程序中,实现负载均衡的逻辑,NebulaGraph 的各个语言的客户端里边已经内置了简单的轮询(Round-Robin)负载均衡,我们只需要在客户端配置多个 GraphD 的地址就可以了:

比如我们在创建连接池的时候,指定了两个不同的 GraphD 的地址(对应不同进程实例):

1
2
3
4
5
6
7
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

config = Config()
config.max_connection_pool_size = 10
connection_pool = ConnectionPool()
connection_pool.init([('127.0.0.1', 9669), ('127.0.0.1', 49433)], config)

我们在取得连接的时候,就会从连接池中随机取得一个连接:

1
2
3
4
5
6
7
In [10]: connection0 = connection_pool.get_connection()

In [11]: connection1 = connection_pool.get_connection()

# 这两个连接的 GraphD 地址是不同的
In [12]: connection0._port, connection1._port
Out[12]: (9669, 49433)

这种客户端的负载均衡的问题在于它的配置、实现细节与应用代码耦合在一起,如果我们需要修改负载均衡的策略,就需要修改应用代码,这样就会增加应用的复杂度。

3.1.2 代理的负载均衡

基于代理的负载均衡,就是在应用程序之前,增加一个代理层,来实现负载均衡的逻辑,这样应用程序就不需要关心负载均衡的问题了。在 k8s 里的话,我们可以使用 k8s 的 Service 来实现这个代理层。

这是一个在 Minikube 中为 NebulaGraph 集群中 GraphD 创建的 Service:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
cat <<EOF | kubectl create -f -
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/cluster: nebula
    app.kubernetes.io/component: graphd
    app.kubernetes.io/managed-by: nebula-operator
    app.kubernetes.io/name: nebula-graph
  name: nebula-graphd-svc-nodeport
  namespace: default
spec:
  externalTrafficPolicy: Local
  ports:
  - name: thrift
    port: 9669
    protocol: TCP
    targetPort: 9669
    nodePort: 30000
  - name: http
    port: 19669
    protocol: TCP
    targetPort: 19669
    nodePort: 30001
  selector:
    app.kubernetes.io/cluster: nebula
    app.kubernetes.io/component: graphd
    app.kubernetes.io/managed-by: nebula-operator
    app.kubernetes.io/name: nebula-graph
  type: NodePort
EOF

创建了它之后,我们就可以通过它暴露的单独端口来访问 NebulaGraph 集群中的 GraphD 了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
In [13]: connection_pool = ConnectionPool()
    ...: connection_pool.init([('192.168.49.2', 9669)], config)
Out[13]: True

In [14]: connection0 = connection_pool.get_connection()

In [15]: connection1 = connection_pool.get_connection()

In [16]: connection0._ip, connection1._ip
Out[16]: ('192.168.49.2', '192.168.49.2')

可以看到,在连接层面上来看,客户端只知道代理的地址,而不知道 NebulaGraph 集群中的 GraphD 的地址,这样就实现了客户端与 NebulaGraph 集群中的 GraphD 的解耦。

然而,当我们在 connection 之上创建 session 的时候,就能看到实际上客户端的不同请求是落在了不同的 GraphD 上的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
In [17]: session = connection_pool.get_session('root', 'nebula')

In [18]: session._session_id
Out[18]: 1668670607568178

In [19]: session1 = connection_pool.get_session('root', 'nebula')

In [20]: session1._session_id
Out[20]: 1668670625563307

# 得到每一个 session 的 ID

In [21]: session.execute("SHOW SESSIONS")

# 它们分别对应了两个不同的 graphd 实例

Out[21]: ResultSet(keys: ['SessionId', 'UserName', 'SpaceName', 'CreateTime', 'UpdateTime', 'GraphAddr', 'Timezone', 'ClientIp'], values: [1668670607568178, "root", "", utc datetime: 2022-11-17T07:36:47.568178, timezone_offset: 0, utc datetime: 2022-11-17T07:36:47.575303, timezone_offset: 0, "nebula-graphd-0.nebula-graphd-svc.default.svc.cluster.local:9669", 0, "172.17.0.1"],[1668670625563307, "root", "", utc datetime: 2022-11-17T07:37:05.563307, timezone_offset: 0, utc datetime: 2022-11-17T07:37:03.638910, timezone_offset: 0, "nebula-graphd-1.nebula-graphd-svc.default.svc.cluster.local:9669", 0, "172.17.0.1"])

3.2 底层存储接口的暴露

在 NebulaGraph 中,我们可以通过 StorageClient 来访问底层的存储接口,这个接口可以用来做一些分析型、数据全扫描计算的工作。

然而存储层的分布式服务实例不像 GraphD 那样,它们是有状态的,这其实与 K8s 或者 Docker Compose 的部署模型是相违背的,如果访问的应用 StorageD 客户端在集群外部,我们需要在 NebulaGraph 集群中的每一个存储实例上都部署一个代理(Service),这非常不方便,有时候还是一种浪费。

此外,由于 NebulaGraph 内部服务发现机制和 StorageD 客户端的实现机制决定,每一个 storaged 服务实体都是由其内部的 host:port 唯一确定和寻址的,这给我们中间的代理工作也带来了一些麻烦。

总结来看,我们的需求是:

  • 能够从集群外部访问 NebulaGraph 的存储层每一个实例
  • 每一个实例的访问地址(host:port)和内部的地址是完全一致的

为了实现这个需求,我之前的做法是为每一个实例单独部署一个 GraphD 代理(消耗一个地址,保证端口不变),再在外部手动搭一个 nginx 作为代理,配合 DNS 把内部的地址解析 nginx 上,然后通过域名找到上游(每一个单独的 GraphD 代理)。

注:我在这两个 gist 里给出了这个方法的实验步骤:

最近,我找到了一个相对优雅的可维护的方式:

  • 在 NebulaGraph 集群同一个命名空间下引入一个 APISIX 网关
  • 利用 APISIX 中的 nginx TCP 代理的封装:stream-proxy来暴露 storeaged 的接口
  • 为了最终只利用一个集群的出口(Service,我们利用其支持的 TLSv1.3 中的 extend host name 字段:SNI 来路由上游),做到用不同域名的 TCP over TLS 指向后端的不同 storaged
  • 最终,只需要 Storage 客户端能支持 TLSv1.3(发送 SNI),并且能解析所有 StorageD 的地址到 APISIX 的 Service 上即可

示例图:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
           ┌────────────────────────────────────────────────────────────────────────────────────┐
           │  K8s Cluster                                                                       │
           │                                                      ┌──────────────────────────┐  │
           │          ┌────────────────────────────────────┐      │ NebulaGraph Cluster      │  │
           │          │  APISIX API-GATEWAY                │      │       ┌──────────────┐   │  │
           │          │                                    │      │       │ Storaged-0   │   │  │
           │          │                                    │ ┌────┼──────▶│              │   │  │
           │          │                                    │ │    │       │              │   │  │
           │          │   ┌────────────────────────────┐   │ │    │       └──────────────┘   │  │
           │          │   │ stream-proxy               │   │ │    │                          │  │
  ┌─────┐  │ .─────.  │   │                ┌────┐      │   │ │    │       ┌──────────────┐   │  │
  │     │  │╱       ╲ │   │  - addr: 9559  │    │──────┼───┼─┘    │       │ Storaged-1   │   │  │
━━┫ DNS ┣━━( Service )╋━━━╋▶   tls: true   │    │      │   │ ┌────┼──────▶│              │   │  │
  │     │  │`.     ,' │   │                │    │──────┼───┼─┘    │       │              │   │  │
  └─────┘  │  `───'   │   │                │    │      │   │      │       └──────────────┘   │  │
           │          │   │                │SNI │      │   │      │                          │  │
           │          │   │                │    │──────┼───┼─┐    │       ┌──────────────┐   │  │
           │          │   │                │    │      │   │ │    │       │ Storaged-2   │   │  │
           │          │   │                │    │      │   │ └────┼──────▶│              │   │  │
           │          │   │                │    │──────┼───┼─┐    │       │              │   │  │
           │          │   │                └────┘      │   │ │    │       └──────────────┘   │  │
           │          │   └────────────────────────────┘   │ │    │                          │  │
           │          │                                    │ │    │       ┌──────────────┐   │  │
           │          │                                    │ │    │       │ Storaged-3   │   │  │
           │          │                                    │ └────┼──────▶│              │   │  │
           │          │                                    │      │       │              │   │  │
           │          │                                    │      │       └──────────────┘   │  │
           │          └────────────────────────────────────┘      └──────────────────────────┘  │
           │                                                                                    │
           └────────────────────────────────────────────────────────────────────────────────────┘

这样做的好处是:

  • 在 APISIX 中比较优雅地维护代理的配置,并且可以用到 APISIX 这些现代化的流量管理能力
  • 不需要为每一个 StorageD 单独创建 Service,只需要一个 Service,出集群地址 就可以了
  • 为流量增加了 TLSv1.3 的加密,提高了安全性同时没有给 NebulaGraph 集群内部的南北流量带来的性能损耗

在本文的结尾,我会给出一个实操的实验过程,这里包含了本文提到的所有要点和细节。

3.3 传输层的加密

我们在前一个问题中提及到了,在 APISIX 网关中 terminate TLSv1.3 的连接,借助 SNI 信息路由 StorageD 的方法,其实,单独将 GraphD 接口的 TLS 交给网关来做,好处也是非常明显的:

  • 证书管理在统一的网关控制面做,更加方便
  • 证书运维无 NebulaGraph 集群配置侵入(NebulaGraph 原生支持 TLS 加密,但是加密之后带来了集群内部通信的开销,而且配置和集群其他层面配置在一起,证书更新涉及进程重启,不够灵活)

具体的方法在后边实操中也是有体现的。

4 实操:利用 APISIX 的 stream-proxy 暴露 StorageD 的接口

4.1 实验环境:minikube

我们就在本地的 minikube 上做这个实验吧,首先启动一个 minikube,因为 APISIX 内部的 etcd 需要用到 storageclass,我们带上 穷人版的 storageclass 插件,同时,为了在 k8s 外部访问 storaged 的时候用和内部相同的域名和端口,我们把 node-port 允许的端口扩充到小于 9779 的范围。

1
2
3
minikube start \
    --addons="default-storageclass" \
    --extra-config=apiserver.service-node-port-range=1-65535

4.2 实验环境:NebulaGraph on K8s

这里,我们使用 Nebula Operator 来部署 NebulaGraph 集群,具体的部署方法可以参考 Nebula Operator 文档

咱们做实验,就偷个懒,用我写的 Nebula-Operator-KinD 来一键部署:

1
curl -sL nebula-kind.siwei.io/install-on-k8s.sh | bash

4.3 实验环境:APISIX on k8s

首先是安装,在 Helm 参数中指定打开 stream-proxy 的开关:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
helm repo add apisix https://charts.apiseven.com
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install apisix apisix/apisix \
  --set gateway.type=NodePort \
  --set gateway.stream.enabled=true \
  --set ingress-controller.enabled=true

# dashboard 也装上,方便我们绕过 admin API call 做一些方便的操作。
helm install apisix-dashboard apisix/apisix-dashboard

然后,因为截止到现在,APISIX 的 Helm Chart 之中并没有提供 stream-proxy TCP 的监听端口的 TLS 支持的配置格式,见 https://github.com/apache/apisix-helm-chart/issues/348 ,我们需要手动更改 APISIX 的 configmap,把 stream-proxy 的 TLS 配置加上:

1
kubectl edit ConfigMap apisix

我们编辑把 stream_proxy.tcp 改写成这样:

1
2
3
4
5
6
7
     stream_proxy:                 # TCP/UDP proxy
       only: false
       tcp:                        # TCP proxy port list
         - addr: 9779
           tls: true
         - addr: 9559
           tls: true

这里我们需要重建 APISIX Pod,因为 APISIX 的 stream-proxy 的 TLS 配置是在启动的时候加载的,所以我们需要重建 APISIX Pod:

1
kubectl delete $(kubectl get po -l "app.kubernetes.io/name=apisix" -o name)

4.4 开始实验

我们看看这个实验的目标,就是把 NebulaGraph 的 StorageD 的接口暴露出来,让外部的客户端可以访问到,而暴露的方式如图:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
           ┌────────────────────────────────────────────────────────────────────────────────────┐
           │  K8s Cluster                                                                       │
           │                                                      ┌──────────────────────────┐  │
           │          ┌────────────────────────────────────┐      │ NebulaGraph Cluster      │  │
           │          │  APISIX API-GATEWAY                │      │       ┌──────────────┐   │  │
           │          │                                    │      │       │ Storaged-0   │   │  │
           │          │                                    │ ┌────┼──────▶│              │   │  │
           │          │                                    │ │    │       │              │   │  │
           │          │   ┌────────────────────────────┐   │ │    │       └──────────────┘   │  │
           │          │   │ stream-proxy               │   │ │    │                          │  │
  ┌─────┐  │ .─────.  │   │                ┌────┐      │   │ │    │       ┌──────────────┐   │  │
  │     │  │╱       ╲ │   │  - addr: 9559  │    │──────┼───┼─┘    │       │ Storaged-1   │   │  │
━━┫ DNS ┣━━( Service )╋━━━╋▶   tls: true   │    │      │   │ ┌────┼──────▶│              │   │  │
  │     │  │`.     ,' │   │                │    │──────┼───┼─┘    │       │              │   │  │
  └─────┘  │  `───'   │   │                │    │      │   │      │       └──────────────┘   │  │
           │          │   │                │SNI │      │   │      │                          │  │
           │          │   │                │    │──────┼───┼─┐    │       ┌──────────────┐   │  │
           │          │   │                │    │      │   │ │    │       │ Storaged-2   │   │  │
           │          │   │                │    │      │   │ └────┼──────▶│              │   │  │
           │          │   │                │    │──────┼───┼─┐    │       │              │   │  │
           │          │   │                └────┘      │   │ │    │       └──────────────┘   │  │
           │          │   └────────────────────────────┘   │ │    │                          │  │
           │          │                                    │ │    │       ┌──────────────┐   │  │
           │          │                                    │ │    │       │ Storaged-3   │   │  │
           │          │                                    │ └────┼──────▶│              │   │  │
           │          │                                    │      │       │              │   │  │
           │          │                                    │      │       └──────────────┘   │  │
           │          └────────────────────────────────────┘      └──────────────────────────┘  │
           │                                                                                    │
           └────────────────────────────────────────────────────────────────────────────────────┘

我们已经有了所有的框架,我们要往里填箭头和圆圈了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
$ kubectl get po
NAME                                         READY   STATUS     RESTARTS      AGE
apisix-6d89854bc5-5m788                      1/1     Running    1 (31h ago)   2d4h
apisix-dashboard-b544bd766-nh79j             1/1     Running    8 (31h ago)   2d10h
apisix-etcd-0                                1/1     Running    2 (31h ago)   2d10h
apisix-etcd-1                                1/1     Running    2 (31h ago)   2d10h
apisix-etcd-2                                1/1     Running    2 (31h ago)   2d10h
nebula-graphd-0                              1/1     Running    2 (31h ago)   3d4h
nebula-metad-0                               1/1     Running    2 (31h ago)   3d4h
nebula-storaged-0                            1/1     Running    2 (31h ago)   3d4h
nebula-storaged-1                            1/1     Running    2 (31h ago)   3d4h
nebula-storaged-2                            1/1     Running    2 (31h ago)   3d4h

4.4.1 配置 APISIX 的 stream-proxy

参考 APISIX 文档:https://apisix.apache.org/docs/apisix/stream-proxy/#accept-tls-over-tcp-connection

我们用 APISIX 的 API 来配置 stream-proxy:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

apisix_api_key="edd1c9f034335f136f87ad84b625c8f1"
apisix_pod=$(kubectl get po -l \
    "app.kubernetes.io/name=apisix" -o name)

kubectl exec -it $apisix_pod -- \
    curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 \
    -H "X-API-KEY: $apisix_api_key" -X PUT -d \
'{
    "sni": "nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local",
    "upstream": {
        "nodes": {
            "172.17.0.13:9779": 1
        },
        "type": "roundrobin"
    }
}'

kubectl exec -it $apisix_pod -- \
    curl http://127.0.0.1:9180/apisix/admin/stream_routes/2 \
    -H "X-API-KEY: $apisix_api_key" -X PUT -d \
'{
    "sni": "nebula-storaged-1.nebula-storaged-headless.default.svc.cluster.local",
    "upstream": {
        "nodes": {
            "172.17.0.18:9779": 1
        },
        "type": "roundrobin"
    }
}'

kubectl exec -it $apisix_pod -- \
    curl http://127.0.0.1:9180/apisix/admin/stream_routes/3 \
    -H "X-API-KEY: $apisix_api_key" -X PUT -d \
'{
    "sni": "nebula-storaged-2.nebula-storaged-headless.default.svc.cluster.local",
    "upstream": {
        "nodes": {
            "172.17.0.5:9779": 1
        },
        "type": "roundrobin"
    }
}'

注意,当下,APISIX 的 stream-proxy 上游节点不支持域名解析,是受限于上游的 lua 库,详见我报的 issue:https://github.com/apache/apisix/issues/8334 ,理想情况下,我们这里应该给出每一个 storaged 的 SNI 相同的地址作为 upstream.nodes,好像:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
kubectl exec -it $apisix_pod -- \
    curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 \
    -H "X-API-KEY: $apisix_api_key" -X PUT -d \
'{
    "sni": "nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local",
    "upstream": {
        "nodes": {
            "nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local": 1
        },
        "type": "roundrobin"
    }
}'

4.4.2 配置 APISIX 中 storaged 地址的 TLS 证书

在生产环境下,我们应该云原生的方式去管理自签或者公共信任的证书,这里,我们就手动利用 mkcert 工具来做这件事儿。

安装 mkcert

1
2
3
4
5
6
# 首次运行,需要安装 mkcert,并且生成根证书
# macOS 的话
brew install mkcert
# ubuntu 的话
apt-get install wget libnss3-tools
# 然后再去 https://github.com/FiloSottile/mkcert/releases/ 下载 mkcert

签发证书:

1
mkcert '*.nebula-storaged-headless.default.svc.cluster.local'

利用 APISIX-dashboard 将证书导入到 APISIX 之中

单独开一个终端,运行:

1
2
3
4
5
6
7
8
9
export POD_NAME=$(\
    kubectl get pods \
    -l "app.kubernetes.io/name=apisix-dashboard,app.kubernetes.io/instance=apisix-dashboard" \
    -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(\
    kubectl get pod $POD_NAME \
    -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
kubectl \
    port-forward $POD_NAME 8080:$CONTAINER_PORT --address='0.0.0.0'

浏览器访问 http://10.1.1.168:8080/ssl/list ,账号密码都是 admin ,点击 Create 按钮,将刚刚生成的证书导入到 APISIX 之中。

https://user-images.githubusercontent.com/1651790/202471244-7081b37e-1e8f-4298-8887-db2feefe74a2.png

4.4.3 增加 APISIX 的 NodePort Service

创建一个 NodePort Service,用于暴露 APISIX 的 9779 端口,这样,我们就可以通过外部的 IP 地址访问到 APISIX 了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
cat <<EOF | kubectl apply -f -
spec:
  selector:
    app.kubernetes.io/instance: apisix
    app.kubernetes.io/name: apisix
  ports:
    - protocol: TCP
      port: 9779
      targetPort: 9779
      name: thrift
      nodePort: 9779
  type: NodePort
EOF

因为前边 minikube 中我们配置了端口的范围覆盖到了 9779,所以我们可以看到,这个 NodePort Service 的端口在宿主机上也可以从 minikube ip 的同一个端口访问到:

1
2
3
4
5
6
7
8
9
$ minikube service apisix-svc
$ minikube service list
|------------------------|---------------------------------|-------------------|---------------------------|
|       NAMESPACE        |              NAME               |    TARGET PORT    |            URL            |
|------------------------|---------------------------------|-------------------|---------------------------|
...
| default                | apisix-svc                      | thrift/9779       | http://192.168.49.2:9779  |<---
...
|------------------------|---------------------------------|-------------------|---------------------------|

当然,minikube 假设我们的服务都是 HTTP 的,给出的 URL 是 HTTP:// 的,不用理会它,我们心里知道它是 TCP over TLS 就好了。

4.4.4 配置 K8s 外部 DNS

我们需要配置一个 DNS 服务,让我们可以通过 nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local 等三个域名通过 minikube 的 NodePort Service 访问到我们的 NebulaGraph 的 storaged 服务。

获得 minikube 的 IP 地址:

1
2
$ minikube ip
192.168.49.2

配置 /etc/hosts

1
2
3
4
192.168.49.2 nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local
192.168.49.2 nebula-storaged-1.nebula-storaged-headless.default.svc.cluster.local
192.168.49.2 nebula-storaged-2.nebula-storaged-headless.default.svc.cluster.local
192.168.49.2 nebula-metad-0.nebula-metad-headless.default.svc.cluster.local

4.4.5 验证 NebulaGraph Storage Client 可以从所有的节点中获取到数据

这里,为了方便,我们用到 python 的客户端。

由于在写本文的时候,NebulaGraph Python 客户端的 StorageClient 尚未支持 TLS,对它支持的 PR 刚好是我为了本实验写的:https://github.com/vesoft-inc/nebula-python/pull/239 。

所以我们要从我的个人分支安装这个客户端:

1
2
3
4
5
6
git clone https://github.com/wey-gu/nebula-python.git
cd nebula-python
python3 -m pip install .
python3 -m pip install ipython
# 进入 ipython
ipython

我们在 iPython 中交互式验证:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient
from nebula3.Config import SSL_config
import ssl
import os

meta_cache = MetaCache([('nebula-metad-0.nebula-metad-headless.default.svc.cluster.local', 9559)],
                       50000)

storage_addrs = [HostAddr(host='nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local', port=9779),
                 HostAddr(host='nebula-storaged-1.nebula-storaged-headless.default.svc.cluster.local', port=9779),
                 HostAddr(host='nebula-storaged-2.nebula-storaged-headless.default.svc.cluster.local', port=9779)]

# 自签证书配置
current_dir = os.path.abspath(".")

ssl_config = SSL_config()
ssl_config.cert_reqs = ssl.CERT_OPTIONAL
ssl_config.cert_reqs = ssl.CERT_OPTIONAL
ssl_config.ca_certs = os.path.join(
    os.path.expanduser("~/.local/share/mkcert"), 'rootCA.pem'
)
ssl_config.keyfile = os.path.join(
    current_dir, 'nebula-storaged-headless.default.svc.cluster.local+1-key.pem'
)
ssl_config.certfile = os.path.join(
    current_dir, 'nebula-storaged-headless.default.svc.cluster.local+1.pem'
)

# 实例化 StorageClient

graph_storage_client = GraphStorageClient(meta_cache, storage_addrs, 5000, ssl_config)

# 验证可以从所有的节点中获取到数据
resp = graph_storage_client.scan_vertex(
        space_name='basketballplayer',
        tag_name='player')
while resp.has_next():
    result = resp.next()
    for vertex_data in result:
        print(vertex_data)

结果✅:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
("player112" :player{name: "Jonathon Simmons", age: 29})
("player1120" :player{name: "李四", age: 30})
("player117" :player{name: "Stephen Curry", age: 31})
("player119" :player{name: "Kevin Durant", age: 30})
("player134" :player{name: "Blake Griffin", age: 30})
("player141" :player{name: "Ray Allen", age: 43})
("player144" :player{name: "Shaquille O'Neal", age: 47})
("player149" :player{name: "Ben Simmons", age: 22})
("player100" :player{name: "Tim Duncan", age: 42})
("player101" :player{name: "Tony Parker", age: 36})
("player110" :player{name: "Cory Joseph", age: 27})
("player126" :player{name: "Kyrie Irving", age: 26})
("player131" :player{name: "Paul George", age: 28})
("player133" :player{name: "Yao Ming", age: 38})
("player140" :player{name: "Grant Hill", age: 46})
("player105" :player{name: "Danny Green", age: 31})
("player109" :player{name: "Tiago Splitter", age: 34})
("player111" :player{name: "David West", age: 38})
...

5 总结

  • NebulaGraph 查询接口的负载均衡可以借助 K8s Service来做;
  • NebulaGraph 底层存储接口的暴露在 K8s 中可以利用 APISIX Stream Proxy 和 SNI 来优雅实现;
  • 利用 API 网关对出口传输层的加密是一个很好的选择,相较于用 NebulaGraph 原生的 TLS 的方式。

6 一些坑

  • 发现 fbthrift python 并不支持 发送 extend host name(SNI),https://github.com/vesoft-inc/nebula-python/pull/238 ,写了 PR 去做支持,这时候 APISIX 中的报错是 failed to find SNI:

    1
    2
    3
    
    2022/11/15 10:18:26 [error] 78#78: *1744270 stream [lua] init.lua:842: stream_ssl_phase(): failed to fetch ssl config: failed to find SNI: 
    please check if the client requests via IP or uses an outdated protocol. If you need to report an issue, provide a packet capture file of the TLS handshake., context: 
    ssl_certificate_by_lua*, client: 172.17.0.1, server: 0.0.0.0:9779
    

    参考:

  • 发现 APISIX stream 里边不解析上游 node 域名,我查了所有一溜的 dns 都没有问题,去提了 issue 才知道是已知问题:https://github.com/apache/apisix/issues/8334,只好先手配 IP:Port 作罢。

    1
    2
    3
    
    2022/11/15 12:26:59 [error] 44#44: *9538531 stream [lua] resolver.lua:47: parse_domain(): failed to parse domain: nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local, error: failed to query the DNS server: dns client error: 101 empty record received while prereading client data, client: 172.17.0.1, server: 0.0.0.0:9779
    2022/11/15 12:26:59 [error] 44#44: *9538531 stream [lua] upstream.lua:79: parse_domain_for_nodes(): dns resolver domain: nebula-storaged-0.nebula-storaged-headless.default.svc.cluster.local error: failed to query the DNS server: dns client error: 101 empty record received while prereading client data, client: 172.17.0.1, server: 0.0.0.0:9779
    2022/11/15 12:26:59 [error] 44#44: *9538531 stream [lua] init.lua:965: stream_preread_phase(): failed to set upstream: no valid upstream node while prereading client data, client: 172.17.0.1, server: 0.0.0.0:9779
    

题图版权 Lars

利用 dbt,基于表结构的 Nebulagraph 图建模与 ETL

2022年11月10日 20:19

如何把相对原始的数据处理、建模并导入 NebulaGraph?本文用一个端到端的示例演示,从多数据源聚合数据,清理、利用 dbt 转换成 NebulaGraph 建模的属性图点边记录,最后导入成图谱的全流程。

1 任务

假设作为一个类似于 Netflix、爱奇艺的服务提供商,我们需要利用 NebulaGraph 搭建一个用户-电影知识图谱,来辅助支撑推荐、问答和推荐理由等常见由图谱支撑的场景。

知识图谱需要的数据存在在不同的数据源,比如一些公开的 API、数仓中的不同数据库、静态的文件等。这时候,我们需要以下几个步骤来从数据构建图谱:

  • 分析可能获取的数据
  • 选取关心的关联关系,图建模
  • 抽取关联关系,导入图数据库

2 数据来源

假设我们的数据来源是 OMDBMovieLens

OMDB 是一个开放的电影数据库,本例中我们模拟公司内部的业务系统,我们可以获得的信息有:

  • 电影
  • 电影的分类
  • 电影中的工作人员(导演、动作指导、演员、后期制作等)
  • 电影封面、宣传片等

MovieLens 是一个开放的数据集,本例中我们模拟公司内部的用户数据,我们可以获得的信息有:

  • 用户
  • 电影
  • 用户对电影的评分交互

3 图建模

在前边我们推荐系统的文章中我们介绍了推荐系统在图上的一些基本的方法(文章的链接是 www.siwei.io/recommendation-system-with-graphdb/)。其中的基于内容过滤关注了用户–>电影、电影–>分类、电影–>演员、电影–>导演的关系,协同过滤的方法则关注用户–>电影的关系,推荐理由服务则关注以上所有的关系,所以总结起来,我们需要的边有:

  • watched(rate(double))

  • with_genre

  • directed_by

  • acted_by

相应的,其中的顶点类型,我们先根据已有的信息中,顶点中可能需要被关注的信息作为属性,给出初始的规划:

  • user(user_id)

  • movie(name)

  • person(name, birthdate)

  • genre(name)

schema_0

4 数据转换

有了目标的图谱结构定义,我们来看看手上的数据如何映射到它。

4.1 OMDB 数据

首先是 OMDB 中的数据,它由很多表组成,比如 all_movies 这张表,存储了所有的电影、以及它们在不同语言下的名字:

movie_id name language_iso_639_1 official_translation
1 Cowboy Bebop de 1
1 Cowboy Bebop en 1
2 Ariel - Abgebrannt in Helsinki de 0
3 Shadows in Paradise en 0
3 Im Schatten des Paradieses de 0
3 Schatten im Paradies de 1

all_casts 表格中保有所有电影相关的工作人员:

movie_id person_id job_id role position
11 1 21 1
11 1 13 1
11 2 15 Luke Skywalker 1
11 3 15 Han Solo 3
11 4 15 Leia Organa 2

但是这里的每一个人的姓名等信息、以及他/她在电影中任职的职位,则分别在另外的表中:

  • job_names

    比如 1 代表编剧、2 代表制作人,有意思的是,和电影 id 与姓名一样,job_id 到 name 是一对多的关系,因为 OMDB 中的数据都是多语言的。

job_id name language_iso_639_1
1 Autoren de
1 Writing Department en
1 Departamento de redacción es
1 Département écriture fr
1 Scenariusz pl
2 Produzenten de
2 Production Department en
  • all_people
id name birthday deathday gender
1 George Lucas 1944-05-14 \N 0
2 Mark Hamill 1951-09-25 \N 0
3 Harrison Ford 1942-07-13 \N 0
4 Carrie Fisher 1956-10-21 2016-12-27 1
5 Peter Cushing 1913-05-26 1994-08-11 0
6 Anthony Daniels 1946-02-21 \N 0

这是在数据来源是表结构、RDBMS 中,是一个很典型的情况,所以对于 movie <-[directed_by]-(person) 这个关系,就涉及了 all_moviesall_castsall_peoplejob_names 四个表格

  • directed_by
    • 起点 person_id 在 all_casts 之中
    • 终点 movie_id 在 all_casts 之中
      • 条件是 job_id 为 job_names 之中的 “director”
  • movie
    • person_id 在 all_casts 之中
    • 名字来自 all_movies 中按 id 查找,language 为 “en”
  • person
    • movie_id 在 all_casts 之中
    • 名字、生日在 all_people 之中

所有 OMDB 中我们关心的表的关联如图:

modeling_omdb

4.2 MovieLens 数据集

而上边只是一个数据源、数据表或者数仓的数据,在真实场景中,我们还需要从其他源头收取数据,并聚合起来,在本例中,我们还需要从 MovieLens 的数据集中抽取需要的知识。

这里,涉及到 MovieLens 数据集,我们利用的只有:用户–>电影,这一条关系。

  • movies.csv
movieId title genres
1 Toy Story (1995) Adventure
2 Jumanji (1995) Adventure
3 Grumpier Old Men (1995) Comedy
4 Waiting to Exhale (1995) Comedy
  • ratings.csv
userId movieId rating timestamp
1 1 4 964982703
1 3 4 964981247
1 6 4 964982224

从两个表的数据预览似乎可以得出:

  • watched
    • 起点来自于 ratings.csv 中的 userId
    • 终点来自于 ratings.csv 中的 movieId
    • 评分来自于 ratings.csv 中的 rating
  • user
    • 来自于 ratings.csv 中的 userId

然而,细心的你们一定发现 MovieLens 数据集中的 movieId 和来自于 OMDB 中的电影 id 完全是不同的两套体系,如果我们需要让他们关联起来,需要将 MovieLens 里的 movieId 转换成为 OMDB 中的电影 id。而他们之间的关联条件则是电影的标题。

然而,通观察,我们知道:

  1. OMDB 电影中标题是多语言的
  2. MovieLens 中的标题结尾带有(1995)这样的年份信息

所以我们最终的结论为

  • watched
    • 起点来自于 ratings.csv 中的 userId
    • 终点来自于 ratings.csv 中的 movieId
      • 终点要从 movies.csv 中的 title ,在 OMDB 之中查找,得到 OMDB 的 movie_id
        • 查找条件为去掉年份,从 OMDB 的英文标题中进行匹配
    • 评分来自于 ratings.csv 中的 rating
  • user
    • 来自于 ratings.csv 中的 userId

现在,这个表格之间的关系如下

modeling_omdb_movielens

4.3 映射数据到图谱(属性图)

总结起来,我们需要对多个数据源中的不同表格(或者表格形式的 CSV 文件)进行聚合,这样的对应关系如图所示:其中蓝色虚线表示图中顶点的数据信息来源,粉色虚线表示边信息的来源。

schema_mapping_to_graph

最后,我们还要对不同表中个体的 id 进行格式化,比如 user_id,是自增的数字,我们要转换成全局唯一的 vertex_id,一个方便的方式是在现有 id 的基础上增加字符串前缀,比如 u_

最终,拿对于 user -[watched]-> movie 这一个关系来说,我们可以处理得到这样的表结构数据:

user_id rating title omdb_movie_id
u_1 5 Seven (a.k.a. Se7en) 807
u_1 5 Star Wars: Episode IV - A New Hope 11
u_1 5 Star Wars: Episode IV - A New Hope 10
u_1 4 Mask, The 832
u_1 3 Mrs. Doubtfire 832

其中每一行记录中存在三个图上的结构信息:

  • user 顶点 id
  • movie 顶点 id
  • watched 边的 rating 值

5 工具

到此,我们已经完成了数据的分析与建模设计,在进入”抽取关联关系,导入图数据库“环节之前,先介绍一下我们要用到的工具。

”抽取关联关系“可以简单认为是 ETL 中的 Extract 和 Transform。本质上就是工程上执行数据映射与转换的工作,市面上有很多不同风格的工具、开源项目可以做。这里我们用到我个人比较喜欢的工具:dbt。

5.1 数据转换利器 dbt

dbt 是一个开源的数据转换工具,他有非常成熟的社区和生态,可以在大多数主流数仓之中进行高效、可控、高质量的数据转换工作,无论是临时的转换工作(ad-hoc),还是在给定的定时 pipeline 中进行复杂编排,dbt 都可以很好胜任,他的一大特色就是使用 SQL-like 语言去描述数据转换的规则,基于 GitOps,可以非常优雅地去多人协作、维护超大规模数据团队里复杂的数据处理作业。而且内置的数据测试能力可以很好的控制数据的质量,做到可复现、可控制。

dbt 不仅有很多集成的子项目,还能和很多其他优秀的开源项目有机结合(meltano、AirFlow、Amundsen 、Superset 等等),形成一整套现代的数据基础设施体系,感兴趣的同学可以参考我之前搭建的数据血缘与元数据治理参考架构文章:www.siwei.io/en/data-lineage-oss-ref-solution。

https://user-images.githubusercontent.com/1651790/168849779-4826f50e-ff87-4e78-b17f-076f91182c43.svg

简单来说,dbt 是一个 python 写的命令行工具,当我们使用它的时候,针对每一个项目,我们可以创建特定格式的项目文件夹,其中包涵一个 YAML 格式的配置文件,在配置文件里指定数据转换的来源信息在哪里,目标在哪里(处理之后的数据存储的地方,可能是 Postgres,Big Query,Spark 等)。在数据源中,我们用 YAML 文件和 .SQL 文件一起描述了”从哪里取哪些数据,如何做变换,输出什么“的信息。

starter-project-dbt-cli

这个截图就是 dbt 官方文档中的示例项目中的文件和配置,可以看到 models/example 里的信息就是最核心的数据转换(transform)的规则,而所有的其他数据都是和这个数据转换相关的元数据,这些 dbt 项目文件非常适合用 git 来进行维护,进行现代、自动化的 DataOps。

注:

可以参考 dbt 文档上手理解它:https://docs.getdbt.com/docs/get-started/getting-started-dbt-core

5.2 NebulaGraph 数据导入

经过 dbt 对数据进行处理之后,我们可以得到直接映射到不同类型的顶点、边、及其属性的表结构的中间数据,它们可以是 CSV 的文件形式,也可以是数仓中的表,甚至可能是 Spark 中的 dataframe。而将它们导入 NebulaGraph 有不同的选择,其中 NebulaGraph Exchange,Nebula-Importer,还有 Nebula-Spark-Connector 都可以作为导入数据。

注:

大家可以在 www.siwei.io/sketches/nebula-data-import-options 了解更多 NebulaGraph 数据导入不同工具的介绍,知道如何选择。

在这里,我就用最简单的 Nebula-Importer 作为例子。

Nebula-Importer 是一个用 Golang 写的开源工具,它可以编译成一个单文件的二进制,通过预配置的 YAML 格式的文件,获得给定的 CSV 文件到 NebulaGraph 中点、边的对应关系,进行读取和导入。

注:

Nebula-Importer 代码:https://github.com/vesoft-inc/nebula-importer/

Nebula-Importer 文档:https://docs.nebula-graph.com.cn/master/nebula-importer/use-importer/

6 实操

现在我们就实操一下如何利用 dbt + Nebula-Importer 进行多数据源聚合、转换、再导入 NebulaGraph 的过程,整个项目的代码已经开源,仓库在 https://github.com/wey-gu/movie-recommendation-dataset 上,欢迎大家参考、共建。

整个过程如下:

  • 将源数据简单清洗、导入数仓(Postgres)(EL)
  • 用 dbt 对数据进行转换(Transform)、导出为 CSV 文件
  • 用 Nebula Importer 将 CSV 导入 NebulaGraph(L)

ETL_dbt_nebulagraph_importer

6.1 准备 dbt 环境

dbt 是一个 python 项目,我们在一个虚拟的 python3 环境里安装好 dbt 和 dbt-postgres。

1
2
3
python3 -m venv .venv
source .venv/bin/activate
pip install dbt-postgres

创建一个 dbt 项目,并进入到空的项目里:

1
2
dbt init dbt_project
cd dbt_project

看看里边的文件吧:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
$ tree .
.
|-- README.md                      # 项目说明 README
|-- analyses
|-- dbt_project.yml                # 项目配置文件
|-- macros
|-- models                         # transform 来源
|   \-- example
|       |-- my_first_dbt_model.sql # 一个描述了如何从元数据中 SELECT 并处理的规则
|       |-- my_second_dbt_model.sql
|       \-- schema.yml             # 规则文件的元数据配置,描述了 sql 规则的属性
|-- seeds                          # 源数据如果是 CSV 文件,可以放到 seeds 里
|-- snapshots
\-- tests

7 directories, 5 files

最后,咱们拉一个容器里的 Postgres 当做我们这个项目的数仓,如果你已经有各种其他数仓,就不需要这一步了,不过要把项目中的配置文件作相应的修改,并安装相应的 dbt 插件。

1
2
3
4
5
docker run --rm --name postgres \
    -e POSTGRES_PASSWORD=nebula \
    -e POSTGRES_USER=nebula \
    -e POSTGRES_DB=warehouse -d \
    -p 5432:5432 postgres

6.2 数据下载与预处理

我们把数据放到项目的 raw_data 下吧

1
2
mkdir -p raw_data
cd raw_data

注意,假设 raw_data 在 dbt_proeject 之下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
tree ..
..
|-- README.md
|-- analyses
|-- dbt_project.yml
|-- macros
|-- models
|   \-- example
|       |-- my_first_dbt_model.sql
|       |-- my_second_dbt_model.sql
|       \-- schema.yml
|-- raw_data                       # 新建的目录
|-- seeds
|-- snapshots
\-- tests

8 directories, 5 files

我们把 omdb 数据下载之后,再解压:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
wget www.omdb.org/data/all_people.csv.bz2
wget www.omdb.org/data/all_people_aliases.csv.bz2
wget www.omdb.org/data/people_links.csv.bz2
wget www.omdb.org/data/all_casts.csv.bz2
wget www.omdb.org/data/job_names.csv.bz2
wget www.omdb.org/data/all_characters.csv.bz2
wget www.omdb.org/data/movie_categories.csv.bz2
wget www.omdb.org/data/movie_keywords.csv.bz2
wget www.omdb.org/data/category_names.csv.bz2
wget www.omdb.org/data/all_categories.csv.bz2
wget www.omdb.org/data/all_movie_aliases_iso.csv.bz2
bunzip2 *.bz2

然后是 MovieLens 数据集的下载、解压:

1
2
3
wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
unzip ml-latest-small.zip
rm *.zip

在导入数仓进行转换(Transform)之前我们做一些数据的预处理,然后把他们放到 seeds 之下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 因为是实验项目,我们简单粗暴地去掉带有转义的引号的数据,因为它们会被认为是无效字符,处理之后的结果放到 seeds 下边。
grep -v '\\"' raw_data/all_movie_aliases_iso.csv > seeds/all_movie_aliases_iso.csv
grep -v '\\"' raw_data/all_casts.csv > seeds/all_casts.csv
grep -v '\\"' raw_data/all_characters.csv > seeds/all_characters.csv
grep -v '\\"' raw_data/all_people.csv > seeds/all_people.csv
grep -v '\\"' raw_data/category_names.csv > seeds/category_names.csv
grep -v '\\"' raw_data/job_names.csv > seeds/job_names.csv
# 下边的文件无需处理,直接放到 seeds 下边。
cp raw_data/movie_categories.csv seeds/movie_categories.csv
cp raw_data/movie_keywords.csv seeds/movie_keywords.csv
cp raw_data/all_categories.csv seeds/all_categories.csv
cp raw_data/ml-latest-small/ratings.csv seeds/movielens_ratings.csv
cp raw_data/ml-latest-small/movies.csv seeds/movielens_movies.csv

有了 seeds 下边的文件之后,可以用一个命令把他们导入到数仓里:

参考文档:https://docs.getdbt.com/docs/build/seeds

1
dbt seed

执行过程因数仓而异,用本地的 postgres 可能要等一会儿才能完成,执行结果大概是这样的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
$ dbt seed
05:58:27  Running with dbt=1.3.0
05:58:27  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 289 macros, 0 operations, 11 seed files, 0 sources, 0 exposures, 0 metrics
05:58:28  
05:58:28  Concurrency: 8 threads (target='dev')
05:58:28  
05:58:28  1 of 11 START seed file public.all_casts ....................................... [RUN]
...
07:10:11  1 of 11 OK loaded seed file public.all_casts ................................... [INSERT 1082228 in 4303.78s]
07:10:11  
07:10:11  Finished running 11 seeds in 1 hours 11 minutes and 43.93 seconds (4303.93s).
07:10:11  
07:10:11  Completed successfully
07:10:11  
07:10:11  Done. PASS=11 WARN=0 ERROR=0 SKIP=0 TOTAL=11

6.3 撰写 Transform model

我们创建 model 如下:

1
2
3
mkdir models/movie_recommedation
touch models/movie_recommedation/user_watched_movies.sql
touch models/movie_recommedation/schema.yml

这时候 models 中的文件结构大概是这样的:

1
2
3
4
5
$ tree models
models
\-- movie_recommedation
    |-- user_watched_movies.sql
    \-- schema.yml

这个 model 下边目前只有一个规则,就是负责处理用户观看电影这一个边上数据的 SQL 语句。

我们希望输出三列,所以 schema.yml 中的内容是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
version: 2

models:
  - name: user_watched_movies
    description: "The edges between users and movies they have watched"
    columns:
      - name: user_id
        description: "user id"
        tests:
          - not_null
      - name: movie_id
        description: "movie id"
        tests:
          - not_null
      - name: rating
        description: "rating given by user to movie"
        tests:
          - not_null

注意,这里的 tests 的表达是对数据验证、测试的约束,有了它,我可以用 dbt 轻松对数据质量进行测试、验收,比如我们要求这里的三个字段都是 not_null

然后,我们来写 SQL 吧,user_watched_movies.sql

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{{ config(materialized='table') }}

/*
 JOIN the movieielens_ratings table with the movieielens_movies table, and removing the movie title tailing the year of release
 */
WITH user_watched_movies AS(
    SELECT moveielens_ratings."userId",
        moveielens_ratings."movieId",
        moveielens_ratings.rating,
        REGEXP_REPLACE(moveielens_movies.title, ' \(\d{4}\)$', '') AS title,
        moveielens_movies.genres AS movielens_genres
    FROM moveielens_ratings
        JOIN moveielens_movies ON moveielens_movies."movieId" = moveielens_ratings."movieId"
)
/* 
 JOIN user_watched_movies table with all_movie_aliase_iso table where language is English
 the join condition is the movie title
 */
SELECT concat('u_',user_watched_movies."userId") AS user_id,
    user_watched_movies.rating,
    user_watched_movies.title,
    all_movie_aliases_iso."movie_id" AS OMDB_movie_id,
    user_watched_movies.movielens_genres
FROM user_watched_movies
    JOIN all_movie_aliases_iso ON user_watched_movies.title LIKE CONCAT(all_movie_aliases_iso.name, '%')
    AND all_movie_aliases_iso.language_iso_639_1 = 'en'

而这个 SQL 做的事情就是绿色圆圈标注的部分:

  • moveielens_ratings 中选 user id、movie id、rating、movie title(去掉年份),存成 user_watched_movies 的中间表格
    • movie title 从 moveielens_movies JOIN ,通过 movie_id 相同的匹配条件取得
  • user_watched_movies 中选 user id(增加前缀 u_)、rating、title、OMDB_movie_id
    • OMDB_movie_id 从 all_movie_aliases_isoJOIN,通过相似的电影姓名匹配 OMDB 电影中英文标题取得
    • 最终的字段作为输出

transform_select_joins_user_watched_movies

小贴士:我们可以在 Postgres 的连接器中通过增加 LIMIT 快速调试自己的 SQL 语句。

然后我们可以通过 dbt 来执行、测试刚刚的规则:

1
dbt run -m user_watched_movies

之后,我们应该就可以在 Postgres(数仓)中看到我们转换之后的一个表了。

类似的,如法炮制所有其他部分的 Transform 规则,我们就有这么多 model 了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$ tree models
models
\-- movie_recommedation
    |-- acted_by.sql
    |-- directed_by.sql
    |-- genres.sql
    |-- movies.sql
    |-- people.sql
    |-- schema.yml
    |-- user_watched_movies.sql
    \-- with_genre.sql

再对他们分别执行 transform:

1
2
3
4
5
6
dbt run -m acted_by
dbt run -m directed_by
dbt run -m with_genre
dbt run -m people
dbt run -m genres
dbt run -m movies

6.4 导出数据为 CSV

实际上,NebulaGraph Exchange 本身就支持把很多数据源(Postgres,Clickhouse,MySQL,Hive 等等)导入 NebulaGraph,只是在这个例子中,我们处理的数据量对于 NebulaGraph 来说非常非常小(只有百万级别的边而已),使用最轻量级的 Nebula-Importer 就足够了。而 Nebula-Importer 能消费的数据只有 CSV 文件,所以我们把刚才的表都输出为文件。

首先,我们进入 postgres 的 console,执行 COPY 命令

参考 Postgres 文档:https://www.postgresql.org/docs/current/sql-copy.html

1
2
3
4
5
6
7
8
COPY acted_by TO '/tmp/acted_by.csv'  WITH DELIMITER ',' CSV HEADER;
COPY directed_by TO '/tmp/directed_by.csv'  WITH DELIMITER ',' CSV HEADER;
COPY with_genre TO '/tmp/with_genre.csv'  WITH DELIMITER ',' CSV HEADER;
COPY people TO '/tmp/people.csv'  WITH DELIMITER ',' CSV HEADER;
COPY movies TO '/tmp/movies.csv'  WITH DELIMITER ',' CSV HEADER;
COPY genres TO '/tmp/genres.csv'  WITH DELIMITER ',' CSV HEADER;
-- 对于 user_watched_movies 我们不输出表头,因为这个文件中记录了两种点、一种边,没法让 importer 通过约定好的表头自动导入,只能通过无表头的情况下指定第几列对应什么字段
COPY user_watched_movies TO '/tmp/user_watched_movies.csv'  WITH DELIMITER ',' CSV;

然后把 postgres 容器里的文件导入到 to_nebulagraph 这个文件夹里:

1
2
mkdir -p to_nebulagraph
docker cp postgres:/tmp/. to_nebulagraph/

6.5 导入 NebulaGraph

6.5.1 创建 NebulaGraph 集群

我们可以用 Nebula-Up 一键搭起一个测试的 NebulaGraph 单机集群,然后参考数据集的 GitHub 仓库,一键导入所需数据:

注:

  • Nebula-UP:https://github.com/wey-gu/nebula-up

  • 数据集仓库:https://github.com/wey-gu/movie-recommendation-dataset

1
curl -fsSL nebula-up.siwei.io/install.sh | bash

6.5.2 创建 Schema

首先,我们创建一个叫做 moviegraph 的图空间,针对前边的建模,创建点边类型的结构(Schema):

先进入 NebulaGraph 的 console:

1
~/.nebula-up/console.sh

然后执行如下 DDL(Data Definiation Language):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE SPACE moviegraph(partition_num=10,replica_factor=1,vid_type=fixed_string(32));
:sleep 20
USE moviegraph;
CREATE TAG person(name string, birthdate string);
CREATE TAG movie(name string);
CREATE TAG genre(name string);
CREATE TAG user(user_id string);
CREATE EDGE acted_by();
CREATE EDGE directed_by();
CREATE EDGE with_genre();
CREATE EDGE watched(rate float);
exit

6.5.3 创建 Nebula-Importer 配置文件

这个文件是一个描述 CSV 文件和集群中点边数据对应关系的 YAML 文件。详细的格式可以参考文档:https://docs.nebula-graph.com.cn/master/nebula-importer/use-importer/,或者视频教程:https://www.bilibili.com/video/BV1ny4y1u7i4 。

最终的配置文件我已经问大家写好了,在 https://github.com/wey-gu/movie-recommendation-dataset/blob/main/nebula-importer.yaml 可以下载得到。

这里,我们就直接下载我写好了的配置文件,注意,这个文件不应该是 dbt 项目文件的一部分,所以我们退出目录,向上一层,把它放到 dbt_proeject 外边:

1
2
cd ..
wget https://raw.githubusercontent.com/wey-gu/movie-recommendation-dataset/main/nebula-importer.yaml

6.5.4 开始导入

这一步,我们用容器化的 nebula-importer,避免了安装的步骤:

1
2
3
4
5
6
docker run --rm -ti \
    --network=nebula-net \
    -v ${PWD}:/root/ \
    -v ${PWD}/dbt_project/to_nebulagraph/:/data \
    vesoft/nebula-importer:v3.2.0 \
    --config /root/nebula-importer.yaml

很快,所有的数据就导入到 NebulaGraph 之中了,然后我们可以通过 Nebula-Console,执行一些查询看看结果:

进入 console

1
~/.nebula-up/console.sh

进入图空间、执行 SHOW STATS

1
2
USE moviegraph;
SHOW STATS;

结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
(root@nebula) [moviegraph]> SHOW STATS;
+---------+---------------+---------+
| Type    | Name          | Count   |
+---------+---------------+---------+
| "Tag"   | "genre"       | 14397   |
| "Tag"   | "movie"       | 20701   |
| "Tag"   | "person"      | 263907  |
| "Tag"   | "user"        | 610     |
| "Edge"  | "acted_by"    | 673763  |
| "Edge"  | "directed_by" | 101949  |
| "Edge"  | "watched"     | 31781   |
| "Edge"  | "with_genre"  | 194009  |
| "Space" | "vertices"    | 299615  |
| "Space" | "edges"       | 1001502 |
+---------+---------------+---------+
Got 10 rows (time spent 1693/15136 us)

通过 Nebula-Studio,我们也可以在可视化界面探索这个图谱,比如在其中执行这个查询,看一下给用户 u_124 推荐电影 1891 的理由可能是什么?

1
FIND NOLOOP PATH FROM "u_124" TO "1891" over * BIDIRECT UPTO 4 STEPS yield path as `p` | LIMIT 20

他的结果是:曾经喜欢的星战电影的大部分演职人员都也参与了这部和同样是“奥斯卡获奖”且“经典”的电影。

reasoning_movie

我在另一篇文章中给大家用同一个图谱展示了更多图数据库、图算法在推荐系统上的应用,如果大家感兴趣,欢迎阅读:https://www.siwei.io/recommendation-system-with-graphdb/ 。

7 总结

当我们打算把海量数据利用图数据库的能力进行知识转化、洞察分析的时候,往往第一步就是要做多数据源到图数据的转换、处理、建模。对于在无从下手的新手们来说,一个可行的思路是从所有的相关信息出发,去设想最关注的关联关系,把边写出来,然后再罗列可以取得的点、以及需要的点、边上的属性。确定了初始的建模之后,就可以利用 ETL 工具把原始的数据清洗、ETL 成点、边类型的表结构,最后,利用导入工具导入 NebulaGraph。

借助于 dbt,我们可以版本控制、测试、迭代我们的建模与数据转换,一点点进化、丰富构建的知识图谱。

题图版权:Claudio

❌
❌