diff --git a/pkg/acquisition/modules/loki/entry.go b/pkg/acquisition/modules/loki/entry.go index ab7941d570e..09f81413ca2 100644 --- a/pkg/acquisition/modules/loki/entry.go +++ b/pkg/acquisition/modules/loki/entry.go @@ -23,12 +23,12 @@ func (e *Entry) UnmarshalJSON(b []byte) error { return fmt.Errorf("invalid Loki entry: expected [timestamp, line], got %v", values) } - t, err := strconv.Atoi(values[0]) + t, err := strconv.ParseInt(values[0], 10, 64) if err != nil { return err } - e.Timestamp = time.Unix(int64(t), 0) + e.Timestamp = time.Unix(t, 0) e.Line = values[1] return nil diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go index 83486c0c689..65ce224ac80 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -55,14 +55,24 @@ func updateURI(uri string, lq LokiQueryRangeResponse, infinite bool) (string, er } queryParams := u.Query() - if len(lq.Data.Result) > 0 { - lastTS := lq.Data.Result[0].Entries[len(lq.Data.Result[0].Entries)-1].Timestamp + var maxTS time.Time + for _, stream := range lq.Data.Result { + for _, entry := range stream.Entries { + if entry.Timestamp.After(maxTS) { + maxTS = entry.Timestamp + } + } + } + + if !maxTS.IsZero() { // +1 the last timestamp to avoid getting the same result again. - queryParams.Set("start", strconv.Itoa(int(lastTS.UnixNano()+1))) + queryParams.Set("start", strconv.FormatInt(maxTS.UnixNano()+1, 10)) } + // When maxTS.IsZero() (no results), keep the existing start to avoid + // re-fetching already-processed logs. Only end is moved forward below. if infinite { - queryParams.Set("end", strconv.Itoa(int(time.Now().UnixNano()))) + queryParams.Set("end", strconv.FormatInt(time.Now().UnixNano(), 10)) } u.RawQuery = queryParams.Encode() @@ -244,7 +254,7 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { dialer := &websocket.Dialer{} u := lc.getURLFor("loki/api/v1/tail", map[string]string{ "limit": strconv.Itoa(lc.config.Limit), - "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), + "start": strconv.FormatInt(time.Now().Add(-lc.config.Since).UnixNano(), 10), "query": lc.config.Query, "delay_for": strconv.Itoa(lc.config.DelayFor), }) @@ -294,8 +304,8 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQueryRangeResponse { url := lc.getURLFor("loki/api/v1/query_range", map[string]string{ "query": lc.config.Query, - "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), - "end": strconv.Itoa(int(time.Now().UnixNano())), + "start": strconv.FormatInt(time.Now().Add(-lc.config.Since).UnixNano(), 10), + "end": strconv.FormatInt(time.Now().UnixNano(), 10), "limit": strconv.Itoa(lc.config.Limit), "direction": "forward", }) diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/types.go b/pkg/acquisition/modules/loki/internal/lokiclient/types.go index 618c03c8bff..d73ae287fad 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/types.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/types.go @@ -23,11 +23,11 @@ func (e *Entry) UnmarshalJSON(b []byte) error { return fmt.Errorf("invalid Loki entry: expected [timestamp, line], got %v", values) } - t, err := strconv.Atoi(values[0]) + t, err := strconv.ParseInt(values[0], 10, 64) if err != nil { return err } - e.Timestamp = time.Unix(0, int64(t)) + e.Timestamp = time.Unix(0, t) e.Line = values[1] return nil }