Skip to content

Commit 9ea1d24

Browse files
authored
update logging for jobmanager. (#2854)
one directory, prune files in connserver, write keepalives every hour. remove senddata log line.
1 parent 8abe3f0 commit 9ea1d24

File tree

7 files changed

+105
-21
lines changed

7 files changed

+105
-21
lines changed

cmd/wsh/cmd/wshcmd-connserver.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ var serverCmd = &cobra.Command{
3838
RunE: serverRun,
3939
}
4040

41+
const (
42+
JobLogRetentionTime = 48 * time.Hour
43+
JobLogCleanupDelay = 10 * time.Second
44+
JobLogCleanupInterval = 1 * time.Hour
45+
)
46+
4147
var connServerRouter bool
4248
var connServerRouterDomainSocket bool
4349
var connServerConnName string
@@ -53,6 +59,61 @@ func init() {
5359
rootCmd.AddCommand(serverCmd)
5460
}
5561

62+
func cleanupOldJobLogs() {
63+
jobDir := wavebase.GetRemoteJobLogDir()
64+
entries, err := os.ReadDir(jobDir)
65+
if err != nil {
66+
return
67+
}
68+
69+
cutoffTime := time.Now().Add(-JobLogRetentionTime)
70+
71+
for _, entry := range entries {
72+
if entry.IsDir() {
73+
continue
74+
}
75+
76+
name := entry.Name()
77+
if !strings.HasSuffix(name, ".log") {
78+
continue
79+
}
80+
81+
info, err := entry.Info()
82+
if err != nil {
83+
continue
84+
}
85+
86+
if info.ModTime().Before(cutoffTime) {
87+
filePath := filepath.Join(jobDir, name)
88+
err := os.Remove(filePath)
89+
if err != nil {
90+
log.Printf("error removing old job log file %s: %v", filePath, err)
91+
} else {
92+
log.Printf("removed old job log file: %s", filePath)
93+
}
94+
}
95+
}
96+
}
97+
98+
func startJobLogCleanup() {
99+
go func() {
100+
defer func() {
101+
panichandler.PanicHandler("startJobLogCleanup", recover())
102+
}()
103+
104+
time.Sleep(JobLogCleanupDelay)
105+
106+
cleanupOldJobLogs()
107+
108+
ticker := time.NewTicker(JobLogCleanupInterval)
109+
defer ticker.Stop()
110+
111+
for range ticker.C {
112+
cleanupOldJobLogs()
113+
}
114+
}()
115+
}
116+
56117
func getRemoteDomainSocketName() string {
57118
homeDir := wavebase.GetHomeDir()
58119
return filepath.Join(homeDir, wavebase.RemoteWaveHomeDirName, wavebase.RemoteDomainSocketBaseName)
@@ -218,6 +279,7 @@ func serverRunRouter() error {
218279
}()
219280
wshremote.RunSysInfoLoop(client, connServerConnName)
220281
}()
282+
startJobLogCleanup()
221283
log.Printf("running server, successfully started")
222284
select {}
223285
}
@@ -324,6 +386,7 @@ func serverRunRouterDomainSocket(jwtToken string) error {
324386
}()
325387
wshremote.RunSysInfoLoop(client, connServerConnName)
326388
}()
389+
startJobLogCleanup()
327390

328391
log.Printf("running server (router-domainsocket mode), successfully started")
329392
select {}
@@ -346,6 +409,7 @@ func serverRunNormal(jwtToken string) error {
346409
}()
347410
wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn)
348411
}()
412+
startJobLogCleanup()
349413
select {} // run forever
350414
}
351415

pkg/jobmanager/jobmanager.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ func SetupJobManager(clientId string, jobId string, publicKeyBytes []byte, jobAu
7979
return fmt.Errorf("failed to daemonize: %w", err)
8080
}
8181

82+
go func() {
83+
defer func() {
84+
panichandler.PanicHandler("JobManager:keepalive", recover())
85+
}()
86+
ticker := time.NewTicker(1 * time.Hour)
87+
defer ticker.Stop()
88+
for range ticker.C {
89+
log.Printf("keepalive: job manager active\n")
90+
}
91+
}()
92+
8293
return nil
8394
}
8495

@@ -365,25 +376,14 @@ func (jm *JobManager) StartStream(msc *MainServerConn) error {
365376
return nil
366377
}
367378

368-
func GetJobSocketPath(jobId string) string {
369-
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
370-
return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId))
371-
}
372-
373-
func GetJobFilePath(clientId string, jobId string, extension string) string {
374-
homeDir := wavebase.GetHomeDir()
375-
jobDir := filepath.Join(homeDir, ".waveterm", "jobs", clientId)
376-
return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension))
377-
}
378-
379379
func MakeJobDomainSocket(clientId string, jobId string) error {
380380
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
381381
err := os.MkdirAll(socketDir, 0700)
382382
if err != nil {
383383
return fmt.Errorf("failed to create socket directory: %w", err)
384384
}
385385

386-
socketPath := GetJobSocketPath(jobId)
386+
socketPath := wavebase.GetRemoteJobSocketPath(jobId)
387387

388388
os.Remove(socketPath)
389389

pkg/jobmanager/jobmanager_unix.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"path/filepath"
1414
"syscall"
1515

16+
"github.com/wavetermdev/waveterm/pkg/wavebase"
1617
"golang.org/x/sys/unix"
1718
)
1819

@@ -32,7 +33,7 @@ func daemonize(clientId string, jobId string) error {
3233
}
3334
devNull.Close()
3435

35-
logPath := GetJobFilePath(clientId, jobId, "log")
36+
logPath := wavebase.GetRemoteJobFilePath(jobId, "log")
3637
logDir := filepath.Dir(logPath)
3738
err = os.MkdirAll(logDir, 0700)
3839
if err != nil {
@@ -54,6 +55,7 @@ func daemonize(clientId string, jobId string) error {
5455

5556
log.SetOutput(logFile)
5657
log.Printf("job manager daemonized, logging to %s\n", logPath)
58+
log.Printf("job owner clientid: %s\n", clientId)
5759

5860
signal.Ignore(syscall.SIGHUP)
5961

pkg/jobmanager/mainserverconn.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ type routedDataSender struct {
4242
}
4343

4444
func (rds *routedDataSender) SendData(dataPk wshrpc.CommandStreamData) {
45-
log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s",
46-
dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route)
45+
// log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s",
46+
// dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route)
4747
err := wshclient.StreamDataCommand(rds.wshRpc, dataPk, &wshrpc.RpcOpts{NoResponse: true, Route: rds.route})
4848
if err != nil {
4949
log.Printf("SendData: error sending stream data: %v\n", err)
@@ -132,4 +132,3 @@ func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.Comm
132132
WshCmdJobManager.InputQueue.QueueItem(data.InputSessionId, data.SeqNum, data)
133133
return nil
134134
}
135-

pkg/telemetry/telemetry.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetry
162162
// ignores the timestamp in tevent, and uses the current time
163163
func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
164164
eventTs := time.Now()
165-
// compute to 2-hour boundary, and round up to next 2-hour boundary
166-
eventTs = eventTs.Truncate(2 * time.Hour).Add(2 * time.Hour)
165+
// compute to 1-hour boundary, and round up to next 1-hour boundary
166+
eventTs = eventTs.Truncate(time.Hour).Add(time.Hour)
167167

168168
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
169169
// find event that matches this timestamp with event name "app:activity"
@@ -195,7 +195,7 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err
195195

196196
func TruncateActivityTEventForShutdown(ctx context.Context) error {
197197
nowTs := time.Now()
198-
eventTs := nowTs.Truncate(2 * time.Hour).Add(2 * time.Hour)
198+
eventTs := nowTs.Truncate(time.Hour).Add(time.Hour)
199199
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
200200
// find event that matches this timestamp with event name "app:activity"
201201
uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName)

pkg/wavebase/wavebase.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,22 @@ func getSystemSummary(ctx context.Context) string {
435435
return fmt.Sprintf("%s (%s)", runtime.GOOS, runtime.GOARCH)
436436
}
437437
}
438+
439+
// job socket path on remote machine
440+
func GetRemoteJobSocketPath(jobId string) string {
441+
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
442+
return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId))
443+
}
444+
445+
// job file path on remote machine
446+
func GetRemoteJobFilePath(jobId string, extension string) string {
447+
jobDir := GetRemoteJobLogDir()
448+
return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension))
449+
}
450+
451+
// job file dir on remote machines
452+
func GetRemoteJobLogDir() string {
453+
homeDir := GetHomeDir()
454+
jobDir := filepath.Join(homeDir, ".waveterm", "jobs")
455+
return jobDir
456+
}

pkg/wshrpc/wshremote/wshremote_job.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"time"
1818

1919
"github.com/shirou/gopsutil/v4/process"
20-
"github.com/wavetermdev/waveterm/pkg/jobmanager"
20+
"github.com/wavetermdev/waveterm/pkg/wavebase"
2121
"github.com/wavetermdev/waveterm/pkg/wshrpc"
2222
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
2323
"github.com/wavetermdev/waveterm/pkg/wshutil"
@@ -43,7 +43,7 @@ func isProcessRunning(pid int, pidStartTs int64) (*process.Process, error) {
4343

4444
// returns jobRouteId, cleanupFunc, error
4545
func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, mainServerJwtToken string) (string, func(), error) {
46-
socketPath := jobmanager.GetJobSocketPath(jobId)
46+
socketPath := wavebase.GetRemoteJobSocketPath(jobId)
4747
log.Printf("connectToJobManager: connecting to socket: %s\n", socketPath)
4848
conn, err := net.Dial("unix", socketPath)
4949
if err != nil {

0 commit comments

Comments
 (0)