最近在研究全链路监控的实现方式,目的是计划在项目中加入全链路日志的支持,说到这个问题肯定有人会想到 APM,如:SkyWalking、Cat、Zipkin、Pinpoint 、Elastic APM 等,确实市面上已经存在现成的全链路监控框架可以直接使用,不过说实话,在免费领域 .NET 这方面的实现确是还不够成熟,当然和很多组件本身实现方式也有关,并没有预置埋点。SkyAPM-dotnet 是目前支持组件诊断分析较多的一个实现,如下图:

基于对 SkyWalking 的 SkyAPM-dotnet 和 Elastic APM 的 apm-agent-dotnet 源码阅读,它们本质上都是基于 DiagnosticSource 来实现的诊断跟踪,本身也定义了一套较规范的标准,如果需要实现更多组件的诊断跟踪,基本上是可以直接基于这套标准扩展即可,所以本文就主要介绍 DiagnosticSource 的使用,初步了解实现原理。
DiagnosticSource 是什么
简单来说 DiagnosticSource 一个基于观察者模式的日志模块,日志写入 DiagnosticSource,然后供订阅者消费。DiagnosticSource 只是一个抽象类,它定义了记录事件所需的方法,实际核心的是 DiagnosticListener 实现类,每个 DiagnosticListener 都具有一个 Name 属性(诊断器名),一个应用程序中可包含多个 DiagnosticListener ,每个 DiagnosticListener 有自己唯一的诊断器名标识。 DiagnosticListener 充当发布者角色,通过 Write 向 DiagnosticSource 写入日志,同时提供了 Subscribe 方法设置订阅者来消费 DiagnosticSource 中的日志。
DiagnosticSource 事件发布
在事件发布前需要先创建
DiagnosticSource,如下定义了一个诊断器名为TestDiagnosticListener的DiagnosticListener:1
private static readonly DiagnosticSource testDiagnosticListener = new DiagnosticListener("TestDiagnosticListener");
判断当前诊断器的某个事件名是否存在消费者监听:
1
bool IsEnabled(string name);
携带数据对象写入诊断器 DiagnosticSource 中:
1
void Write(string name, object value);
使用示例:
1
2
3
4if (testDiagnosticListener.IsEnabled("RequestStart"))
{
testDiagnosticListener.Write("RequestStart", "hello world");
}
DiagnosticSource 事件消费
定义
DiagnosticListener事件消费处理接口,实现类中的ListenerName必须与对应DiagnosticListener的诊断器名一致:1
2
3
4public interface IDiagnosticProcessor
{
string ListenerName { get; }
}定义诊断器名为
TestDiagnosticListener的DiagnosticListener事件消费处理逻辑:1
2
3
4
5
6
7
8
9
10public class TestDiagnosticProcessor : IDiagnosticProcessor
{
public string ListenerName { get; } = "TestDiagnosticListener";
[DiagnosticName("RequestStart")]
public void RequestStart([Object]string name)
{
Console.WriteLine(name);
}
}创建
IObserver<DiagnosticListener>实现类订阅所有类型的DiagnosticListener,通过OnNext方法的DiagnosticListener对象获取当前的诊断器名,不同(诊断器名不同)DiagnosticListener发布的事件设置不同的订阅者,主要代码如下(完整代码):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class DiagnosticListenerObserver : IObserver<DiagnosticListener>
{
private readonly IEnumerable<IDiagnosticProcessor> _diagnosticProcessors;
public DiagnosticListenerObserver(IEnumerable<IDiagnosticProcessor> diagnosticProcessors)
{
_diagnosticProcessors = diagnosticProcessors;
}
public void OnNext(DiagnosticListener value)
{
var diagnosticProcessor = _diagnosticProcessors?.FirstOrDefault(_ => _.ListenerName == value.Name);
if (diagnosticProcessor == null) return;
value.Subscribe(new DiagnosticEventObserver(diagnosticProcessor));
}
}事件订阅者需要创建基于
IObserver<KeyValuePair<string, object>>的实现类,根据触发的事件名(value.Key)和已订阅的事件处理集合(_eventCollection)进行匹对查找,匹配上的通过反射执行对应的消费方法,主要代码如下(完整代码):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class DiagnosticEventObserver : IObserver<KeyValuePair<string, object>>
{
private readonly DiagnosticEventCollection _eventCollection;
public DiagnosticEventObserver(IDiagnosticProcessor diagnosticProcessor)
{
_eventCollection = new DiagnosticEventCollection(diagnosticProcessor);
}
public void OnNext(KeyValuePair<string, object> value)
{
var diagnosticEvent = _eventCollection.GetDiagnosticEvent(value.Key);
if (diagnosticEvent == null) return;
try
{
diagnosticEvent.Invoke(value.Value);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}最后需要通过
DiagnosticListener.AllListeners.Subscribe设置DiagnosticListenerObserver对象;执行结果:

总结
通过以上示例,我们完全可以参考基于这样的标准在组件封装(MongoDB、Dapper、Kafka、Redis …)过程中自己埋点(创建相应的 DiagnosticListener 并发布事件),然后订阅者根据需求监听需要的事件,从而达到诊断日志全链路收集的目的。
注:本文涉及的代码主要是参考了 SkyAPM-dotnet 。