From 6d8565d72ecb00e711481ac8abe43343d06a6e7b Mon Sep 17 00:00:00 2001 From: Leonardo Canello Date: Wed, 3 Jun 2026 15:25:43 +0200 Subject: [PATCH] fix(loki): prevent duplicate log ingestion and improve timestamp handling Remove the 'else if infinite' branch in updateURI() that reset the query start timestamp backward by 1 minute when no new results were returned. This caused the same Loki log entries to be re-fetched and ingested multiple times, inflating event counts and triggering false bucket overflows. Also improve robustness: - Scan all streams for maxTS instead of only the first stream's last entry - Use strconv.ParseInt/FormatInt for nanosecond timestamps to avoid 32-bit Atoi overflow on platforms where int is 32-bit - Align entry.go timestamp parsing with types.go (time.Unix(0, t)) Fixes duplicate log lines appearing 2-3x in CrowdSec alerts when using Loki datasource in streaming mode. --- pkg/acquisition/modules/loki/entry.go | 4 ++-- .../loki/internal/lokiclient/loki_client.go | 24 +++++++++++++------ .../modules/loki/internal/lokiclient/types.go | 4 ++-- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pkg/acquisition/modules/loki/entry.go b/pkg/acquisition/modules/loki/entry.go index ab7941d570e..09f81413ca2 100644 --- a/pkg/acquisition/modules/loki/entry.go +++ b/pkg/acquisition/modules/loki/entry.go @@ -23,12 +23,12 @@ func (e *Entry) UnmarshalJSON(b []byte) error { return fmt.Errorf("invalid Loki entry: expected [timestamp, line], got %v", values) } - t, err := strconv.Atoi(values[0]) + t, err := strconv.ParseInt(values[0], 10, 64) if err != nil { return err } - e.Timestamp = time.Unix(int64(t), 0) + e.Timestamp = time.Unix(t, 0) e.Line = values[1] return nil diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go index 83486c0c689..65ce224ac80 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -55,14 +55,24 @@ func updateURI(uri string, lq LokiQueryRangeResponse, infinite bool) (string, er } queryParams := u.Query() - if len(lq.Data.Result) > 0 { - lastTS := lq.Data.Result[0].Entries[len(lq.Data.Result[0].Entries)-1].Timestamp + var maxTS time.Time + for _, stream := range lq.Data.Result { + for _, entry := range stream.Entries { + if entry.Timestamp.After(maxTS) { + maxTS = entry.Timestamp + } + } + } + + if !maxTS.IsZero() { // +1 the last timestamp to avoid getting the same result again. - queryParams.Set("start", strconv.Itoa(int(lastTS.UnixNano()+1))) + queryParams.Set("start", strconv.FormatInt(maxTS.UnixNano()+1, 10)) } + // When maxTS.IsZero() (no results), keep the existing start to avoid + // re-fetching already-processed logs. Only end is moved forward below. if infinite { - queryParams.Set("end", strconv.Itoa(int(time.Now().UnixNano()))) + queryParams.Set("end", strconv.FormatInt(time.Now().UnixNano(), 10)) } u.RawQuery = queryParams.Encode() @@ -244,7 +254,7 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { dialer := &websocket.Dialer{} u := lc.getURLFor("loki/api/v1/tail", map[string]string{ "limit": strconv.Itoa(lc.config.Limit), - "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), + "start": strconv.FormatInt(time.Now().Add(-lc.config.Since).UnixNano(), 10), "query": lc.config.Query, "delay_for": strconv.Itoa(lc.config.DelayFor), }) @@ -294,8 +304,8 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQueryRangeResponse { url := lc.getURLFor("loki/api/v1/query_range", map[string]string{ "query": lc.config.Query, - "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), - "end": strconv.Itoa(int(time.Now().UnixNano())), + "start": strconv.FormatInt(time.Now().Add(-lc.config.Since).UnixNano(), 10), + "end": strconv.FormatInt(time.Now().UnixNano(), 10), "limit": strconv.Itoa(lc.config.Limit), "direction": "forward", }) diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/types.go b/pkg/acquisition/modules/loki/internal/lokiclient/types.go index 618c03c8bff..d73ae287fad 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/types.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/types.go @@ -23,11 +23,11 @@ func (e *Entry) UnmarshalJSON(b []byte) error { return fmt.Errorf("invalid Loki entry: expected [timestamp, line], got %v", values) } - t, err := strconv.Atoi(values[0]) + t, err := strconv.ParseInt(values[0], 10, 64) if err != nil { return err } - e.Timestamp = time.Unix(0, int64(t)) + e.Timestamp = time.Unix(0, t) e.Line = values[1] return nil }