Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions api/log/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,30 +169,37 @@ func (c *Client) TailQuery(ctx context.Context, delayFor time.Duration, out outp
tailResponse := new(loghttp.TailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, conn)
if err != nil {
if ctx.Err() != nil {
return nil
}

// Check if the websocket connection closed unexpectedly. If so, retry.
// The connection might close unexpectedly if the querier handling the tail request
// in Loki stops running. The following error would be printed:
// "websocket: close 1006 (abnormal closure): unexpected EOF"
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
if retryableTailError(err) {
// Close previous connection. If it fails to close the connection it should be fine as it is already broken.
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))

// Try to re-establish the connection up to 5 times.
backoff := backoff.New(context.Background(), backoff.Config{
reconnectBackoff := backoff.New(ctx, backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
})

for backoff.Ongoing() {
for reconnectBackoff.Ongoing() {
conn, err = c.LiveTailQueryConn(ctx, q.QueryString, delayFor, q.Limit, lastReceivedTimestamp, q.Quiet)
if err == nil {
break
}
backoff.Wait()
reconnectBackoff.Wait()
}

if err = backoff.Err(); err != nil {
if err = reconnectBackoff.Err(); err != nil {
if ctx.Err() != nil {
return nil
}
return fmt.Errorf("error recreating tailing connection: %w", err)
}

Expand All @@ -214,3 +221,12 @@ func (c *Client) TailQuery(ctx context.Context, delayFor time.Duration, out outp
}
}
}

func retryableTailError(err error) bool {
return websocket.IsCloseError(
err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
websocket.CloseInternalServerErr,
)
}
3 changes: 3 additions & 0 deletions api/log/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func lokiTailHandler(t *testing.T, timestamp time.Time, lines []string) http.Han
if v == int(req.Limit) {
break
}
if !timestamp.After(req.Start) {
continue
}
entries = append(entries, logproto.Entry{
Timestamp: timestamp,
Line: line,
Expand Down
4 changes: 4 additions & 0 deletions logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (cmd *logsCmd) Run(
}

if cmd.Follow {
if err := client.Log.QueryRange(ctx, out, query); err != nil {
return err
}
query.Start = end
return client.Log.TailQuery(ctx, 0, out, query)
}

Expand Down
Loading