Skip to content

feat:Add KafkaSerializer with unit tests #2297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ChaoEcho
Copy link

Add KafkaSerializer with unit tests to support serializing events to JSON format. Implement conversion for logs, metrics, spans, and raw events, including label transformation logic for Kubernetes environments.

@CLAassistant
Copy link

CLAassistant commented Jul 10, 2025

CLA assistant check
All committers have signed the CLA.

…s to JSON format.

Implement conversion for logs, metrics, spans, and raw events, including label transformation logic for Kubernetes environments.
@ChaoEcho ChaoEcho force-pushed the feat/task-1-serializer branch from 572870a to 37872bb Compare July 10, 2025 05:08

const string TAG_PREFIX = "__tag__:";

const unordered_map<string, string> TAG_CONVERSION_MAP = {{"__path__", "log.file.path"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++的tag转换在其他地方,这里不需要考虑这些

Comment on lines 106 to 136
const bool isK8s = getenv("KUBERNETES_SERVICE_HOST") != nullptr;
for (const auto& tag : group.mTags.mInner) {
const string key = tag.first.to_string();
if (key.empty()) {
continue;
}

string convertedKey = key;
bool converted = false;

// Try K8s-specific conversion first
if (isK8s) {
auto it = SPECIAL_TAG_CONVERSION_MAP.find(key);
if (it != SPECIAL_TAG_CONVERSION_MAP.end()) {
convertedKey = it->second;
converted = true;
}
}

// Try general conversion if not already converted
if (!converted) {
auto it = TAG_CONVERSION_MAP.find(key);
if (it != TAG_CONVERSION_MAP.end()) {
convertedKey = it->second;
}
}

if (!convertedKey.empty()) {
item.tags[convertedKey] = tag.second.to_string();
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++这边不需要

}

vector<CustomSingleLogItem> items;
ConvertToCustomSingleItems(group, items);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelineEventGroup可以直接序列化成json,为什么还需要中间这个结果?

}


bool KafkaEventGroupSerializer::SerializeToJSON(const vector<CustomSingleLogItem>& items, string& output) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以参考一下PipelineEventGroup::ToJson,这个是单测里用的。这两种序列化哪种性能更好?

@shalousun
Copy link
Collaborator

很多通用处理不应该定义的时候和kafka绑定,go里面的这块协议转换实际上kafka、pulsar、es可能都在用

- Implemented serialization for logs, metrics, spans, and raw events
- Added corresponding unit tests
- Removed KafkaSerializer-related code to simplify serialization logic
Comment on lines 33 to 37
const string PROTOCOL_KEY_TIME = "time";
const string PROTOCOL_KEY_CONTENT = "contents";
const string PROTOCOL_KEY_TAG = "tags";

const string TAG_PREFIX = "__tag__:";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kXxxx这种命名规则

Comment on lines 68 to 71
if (kv.first.size() >= TAG_PREFIX.length()
&& strncmp(kv.first.data(), TAG_PREFIX.c_str(), TAG_PREFIX.length()) == 0) {
continue;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++这边的tag一定在tag里面,不用这段特殊处理

Comment on lines 160 to 167
if (kv.first.size() > TAG_PREFIX.length()
&& strncmp(kv.first.data(), TAG_PREFIX.c_str(), TAG_PREFIX.length()) == 0) {
StringView tagName(kv.first.data() + TAG_PREFIX.length(), kv.first.size() - TAG_PREFIX.length());
if (tagName != "__user_defined_id__" && !tagName.empty()) {
writer.Key(tagName.data(), tagName.size());
writer.String(kv.second.data(), kv.second.size());
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,不用特殊处理

Comment on lines 170 to 174
const auto& metricEvent = event.Cast<MetricEvent>();
for (auto it = metricEvent.TagsBegin(); it != metricEvent.TagsEnd(); ++it) {
writer.Key(it->first.data(), it->first.size());
writer.String(it->second.data(), it->second.size());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不太对。你上面加的tag是group级别的,metric这里的tag是event级别的。不能加到一起。

Comment on lines 20 to 22
#ifdef _WIN32
#include <windows.h>
#endif
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是干什么的?

- rename protocol key constants
- add support for metric_tag
@ChaoEcho
Copy link
Author

JSON序列化规范

1. 通用JSON结构

无论内部事件类型如何,所有事件在序列化后都遵循统一的顶层JSON结构。这个结构包含三个固定的键:time, contents, 和 tags

{
  "time": 1678886400,
  "contents": {
    // ...
  },
  "tags": {
    // ...
  }
}
  • time: 事件时间戳(Unix秒),number类型。
  • contents: 一个JSON object,包含了事件的核心数据。其内部结构因事件类型而异。
  • tags: 一个JSON object,包含了整个事件组的元数据标签。

2. 事件类型序列化详解

2.1 LogEvent (日志事件)

序列化前:

  • 核心数据是一个map<string, string>,包含多个键值对。

序列化后:

{
  "time": 1678886400,
  "contents": {
    "message": "user login successful",
    "user_id": "1001",
    "level": "info"
  },
  "tags": {
    "source": "/var/log/app.log",
    "host": "web-server-01"
  }
}

2.2 MetricEvent (指标事件)

序列化前:

  • 指标名 name (string)
  • 指标值 value (double 或 map<string, double>)
  • 事件级标签 tags (map<string, string>)

序列化后:

{
  "time": 1678886401,
  "contents": {
    "__name__": "cpu.usage.percent",
    "__value__": "85.5",
    "metric_tags": {
      "cpu": "0"
    }
  },
  "tags": {
    "region": "cn-hangzhou",
    "host": "metric-server-02"
  }
}

2.3 SpanEvent (链路追踪事件)

序列化前:

  • traceId, spanId, parentSpanId, name (string)
  • startTimeNs, endTimeNs (uint64_t, 纳秒)
  • 事件级标签 attributes (map<string, string>)
  • 范围标签 scope (map<string, string>)

序列化后:

{
  "time": 1678886402,
  "contents": {
    "trace_id": "7a3f1c2b4d5e6f7a",
    "span_id": "9b8c7d6e5f4a3b2c",
    "parent_span_id": "1a2b3c4d5e6f7a8b",
    "name": "GET /api/users",
    "start_time": "1234567890123456789", 
    "end_time": "1234567890123457890",  
    "duration": "1101",                 
    "attributes": {
      "http.method": "GET",
      "http.status_code": "200"
    },
    "scope": {
      "service.name": "user-service"
    }
  },
  "tags": {
    "env": "production",
    "region": "us-west-2"
  }
}

2.4 RawEvent (原始事件)

序列化前:

  • 核心数据是一个 content (string)。

序列化后:

{
  "time": 1678886403,
  "contents": {
    "content": "this is a raw log line"
  },
  "tags": {
    "source": "/var/log/raw.log",
    "host": "server-03"
  }
}

- Remove unused TestHostTags tests
- Simplify event creation function parameters
- Ensure events always include required tags
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants