Skip to content

Commit 91459b2

Browse files
authored
feat(connect): support connect openTelemetry and log for 1.6 (#2961) (#3012)
Extract metrics and logging features as standalone modules.
1 parent fb43893 commit 91459b2

File tree

94 files changed

+6762
-1388
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+6762
-1388
lines changed

automq-log-uploader/README.md

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# AutoMQ Log Uploader Module
2+
3+
This module provides asynchronous S3 log upload capability based on Log4j 1.x. Other submodules only need to depend on this module and configure it simply to synchronize logs to object storage. Core components:
4+
5+
- `com.automq.log.S3RollingFileAppender`: Extends `RollingFileAppender`, pushes log events to the uploader while writing to local files.
6+
- `com.automq.log.uploader.LogUploader`: Asynchronously buffers, compresses, and uploads logs; supports configuration switches and periodic cleanup.
7+
- `com.automq.log.uploader.S3LogConfig`: Interface that abstracts the configuration required for uploading. Implementations must provide cluster ID, node ID, object storage instance, and leadership status.
8+
9+
## Quick Integration
10+
11+
1. Add dependency in your module's `build.gradle`:
12+
```groovy
13+
implementation project(':automq-log-uploader')
14+
```
15+
2. Implement or provide an `S3LogConfig` instance and configure the appender:
16+
17+
```java
18+
// Set up the S3LogConfig through your application
19+
S3LogConfig config = // your S3LogConfig implementation
20+
S3RollingFileAppender.setup(config);
21+
```
22+
3. Reference the Appender in `log4j.properties`:
23+
24+
```properties
25+
log4j.appender.s3_uploader=com.automq.log.S3RollingFileAppender
26+
log4j.appender.s3_uploader.File=logs/server.log
27+
log4j.appender.s3_uploader.MaxFileSize=100MB
28+
log4j.appender.s3_uploader.MaxBackupIndex=10
29+
log4j.appender.s3_uploader.layout=org.apache.log4j.PatternLayout
30+
log4j.appender.s3_uploader.layout.ConversionPattern=[%d] %p %m (%c)%n
31+
```
32+
33+
## S3LogConfig Interface
34+
35+
The `S3LogConfig` interface provides the configuration needed for log uploading:
36+
37+
```java
38+
public interface S3LogConfig {
39+
boolean isEnabled(); // Whether S3 upload is enabled
40+
String clusterId(); // Cluster identifier
41+
int nodeId(); // Node identifier
42+
ObjectStorage objectStorage(); // S3 object storage instance
43+
boolean isLeader(); // Whether this node should upload logs
44+
}
45+
```
46+
47+
48+
The upload schedule can be overridden by environment variables:
49+
50+
- `AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL`: Maximum upload interval (milliseconds).
51+
- `AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL`: Retention period (milliseconds), old objects earlier than this time will be cleaned up.
52+
53+
## Implementation Notes
54+
55+
### Leader Selection
56+
57+
The log uploader relies on the `S3LogConfig.isLeader()` method to determine whether the current node should upload logs and perform cleanup tasks. This avoids multiple nodes in a cluster simultaneously executing these operations.
58+
59+
### Object Storage Path
60+
61+
Logs are uploaded to object storage following this path pattern:
62+
```
63+
automq/logs/{clusterId}/{nodeId}/{hour}/{uuid}
64+
```
65+
66+
Where:
67+
- `clusterId` and `nodeId` come from the S3LogConfig
68+
- `hour` is the timestamp hour for log organization
69+
- `uuid` is a unique identifier for each log batch
70+
71+
## Usage Example
72+
73+
Complete example of using the log uploader:
74+
75+
```java
76+
import com.automq.log.S3RollingFileAppender;
77+
import com.automq.log.uploader.S3LogConfig;
78+
import com.automq.stream.s3.operator.ObjectStorage;
79+
80+
// Implement S3LogConfig
81+
public class MyS3LogConfig implements S3LogConfig {
82+
@Override
83+
public boolean isEnabled() {
84+
return true; // Enable S3 upload
85+
}
86+
87+
@Override
88+
public String clusterId() {
89+
return "my-cluster";
90+
}
91+
92+
@Override
93+
public int nodeId() {
94+
return 1;
95+
}
96+
97+
@Override
98+
public ObjectStorage objectStorage() {
99+
// Return your ObjectStorage instance
100+
return myObjectStorage;
101+
}
102+
103+
@Override
104+
public boolean isLeader() {
105+
// Return true if this node should upload logs
106+
return isCurrentNodeLeader();
107+
}
108+
}
109+
110+
// Setup and use
111+
S3LogConfig config = new MyS3LogConfig();
112+
S3RollingFileAppender.setup(config);
113+
114+
// Configure Log4j to use the appender
115+
// The appender will now automatically upload logs to S3
116+
```
117+
118+
## Lifecycle Management
119+
120+
Remember to properly shutdown the log uploader when your application terminates:
121+
122+
```java
123+
// During application shutdown
124+
S3RollingFileAppender.shutdown();
125+
```
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package com.automq.log;
21+
22+
import com.automq.log.uploader.LogRecorder;
23+
import com.automq.log.uploader.LogUploader;
24+
import com.automq.log.uploader.S3LogConfig;
25+
26+
import org.apache.log4j.RollingFileAppender;
27+
import org.apache.log4j.spi.LoggingEvent;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
public class S3RollingFileAppender extends RollingFileAppender {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(S3RollingFileAppender.class);
34+
private static final Object INIT_LOCK = new Object();
35+
36+
private static volatile LogUploader logUploaderInstance;
37+
private static volatile S3LogConfig s3LogConfig;
38+
39+
public S3RollingFileAppender() {
40+
super();
41+
}
42+
43+
public static void setup(S3LogConfig config) {
44+
s3LogConfig = config;
45+
synchronized (INIT_LOCK) {
46+
if (logUploaderInstance != null) {
47+
return;
48+
}
49+
try {
50+
if (s3LogConfig == null) {
51+
LOGGER.error("No s3LogConfig available; S3 log upload remains disabled.");
52+
throw new RuntimeException("S3 log configuration is missing.");
53+
}
54+
if (!s3LogConfig.isEnabled() || s3LogConfig.objectStorage() == null) {
55+
LOGGER.warn("S3 log upload is disabled by configuration.");
56+
return;
57+
}
58+
59+
LogUploader uploader = new LogUploader();
60+
uploader.start(s3LogConfig);
61+
logUploaderInstance = uploader;
62+
LOGGER.info("S3RollingFileAppender initialized successfully using s3LogConfig {}.", s3LogConfig.getClass().getName());
63+
} catch (Exception e) {
64+
LOGGER.error("Failed to initialize S3RollingFileAppender", e);
65+
throw e;
66+
}
67+
}
68+
}
69+
70+
public static void shutdown() {
71+
if (logUploaderInstance != null) {
72+
synchronized (INIT_LOCK) {
73+
if (logUploaderInstance != null) {
74+
try {
75+
logUploaderInstance.close();
76+
logUploaderInstance = null;
77+
LOGGER.info("S3RollingFileAppender log uploader closed successfully.");
78+
} catch (Exception e) {
79+
LOGGER.error("Failed to close S3RollingFileAppender log uploader", e);
80+
}
81+
}
82+
}
83+
}
84+
}
85+
86+
@Override
87+
protected void subAppend(LoggingEvent event) {
88+
super.subAppend(event);
89+
if (!closed && logUploaderInstance != null) {
90+
LogRecorder.LogEvent logEvent = new LogRecorder.LogEvent(
91+
event.getTimeStamp(),
92+
event.getLevel().toString(),
93+
event.getLoggerName(),
94+
event.getRenderedMessage(),
95+
event.getThrowableStrRep());
96+
97+
try {
98+
logEvent.validate();
99+
logUploaderInstance.append(logEvent);
100+
} catch (IllegalArgumentException e) {
101+
errorHandler.error("Failed to validate and append log event", e, 0);
102+
}
103+
}
104+
}
105+
}

automq-shell/src/main/java/com/automq/shell/log/LogRecorder.java renamed to automq-log-uploader/src/main/java/com/automq/log/uploader/LogRecorder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20-
package com.automq.shell.log;
20+
package com.automq.log.uploader;
2121

2222
import org.apache.commons.lang3.StringUtils;
2323

0 commit comments

Comments
 (0)