pqarrow.FileWriter.Close() leaks per-column-chunk buffers when the underlying io.Writer fails mid-flush
Describe the bug, including details regarding any error messages, version, and platform.
When pqarrow.FileWriter.Close() runs against an io.Writer that has gone broken, it returns the first column-chunk close error and leaves every subsequent column's per-chunk allocator-tracked state stranded. The leak is strictly on the error path: the same code with a working sink returns CheckedAllocator.CurrentAlloc() == 0. Close() is the writer's only chance to release that state — there is no separate Release() API.
The root cause is in parquet/file/row_group_writer.go::(*rowGroupWriter).Close (v18.5.2 lines 232-254):
for _, wr := range rg.columnWriters {
if wr != nil {
if err := wr.Close(); err != nil {
return err // ← strands every column past this one
}
rg.bytesWritten += wr.TotalBytesWritten()
rg.compressedBytesWritten += wr.TotalCompressedBytes()
}
}
When wr.Close() errors on column N, columns N+1..end never get Close() called, so their currentEncoder and (in the buffered-row-group path) their bufferedPageWriter.inMemSink are never released. The bufferedPageWriter.Close body that does release the in-memory sink (page_writer.go:485) is reached via the per-column pager.Close call inside columnWriter.Close — so skipping a column's Close() also skips releasing its accumulated page bytes.
Versions:
github.com/apache/arrow-go/v18 v18.5.2
- Go 1.26.2
- Verified on darwin/arm64 with the reproducer below. Originally observed on linux/amd64 in production with a different failing-writer (a cloud storage authentication failure on a long-lived multi-GiB write).
Reproducer
go.mod pins github.com/apache/arrow-go/v18 v18.5.2. Two-column schema, WriteBuffered loop, an io.Writer that returns io.ErrShortWrite after failAt bytes:
package main
import (
"fmt"
"io"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
)
// failAfter returns io.ErrShortWrite once it has accepted `remaining`
// bytes — models any sink that breaks mid-flush (token expiry, broken
// pipe, HTTP 412 precondition failure, ctx cancellation).
type failAfter struct{ remaining int }
func (w *failAfter) Write(p []byte) (int, error) {
if w.remaining <= 0 {
return 0, io.ErrShortWrite
}
if len(p) <= w.remaining {
w.remaining -= len(p)
return len(p), nil
}
n := w.remaining
w.remaining = 0
return n, io.ErrShortWrite
}
func main() {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
schema := arrow.NewSchema([]arrow.Field{
{Name: "s", Type: arrow.BinaryTypes.String, Nullable: false},
{Name: "i", Type: arrow.PrimitiveTypes.Int32, Nullable: false},
}, nil)
props := parquet.NewWriterProperties(
parquet.WithAllocator(alloc),
parquet.WithMaxRowGroupLength(20_000),
)
fw, err := pqarrow.NewFileWriter(
schema, &failAfter{remaining: 512 << 10}, props, pqarrow.DefaultWriterProps())
if err != nil {
panic(err)
}
for b := range 32 {
bld := array.NewRecordBuilder(alloc, schema)
for r := range 25_000 {
bld.Field(0).(*array.StringBuilder).Append(fmt.Sprintf("v-%d-%d", b, r%50))
bld.Field(1).(*array.Int32Builder).Append(int32(b*25_000 + r))
}
rec := bld.NewRecordBatch()
werr := fw.WriteBuffered(rec)
rec.Release()
bld.Release()
if werr != nil {
break
}
}
fmt.Printf("Close error: %v\n", fw.Close())
fmt.Printf("Leaked: %d bytes\n", alloc.CurrentAlloc())
alloc.AssertSize(printT{}, 0)
}
type printT struct{}
func (printT) Helper() {}
func (printT) Errorf(f string, args ...any) { fmt.Printf(f+"\n", args...) }
Run with:
mkdir pqarrow-leak-repro && cd pqarrow-leak-repro
go mod init pqarrow-leak-repro
go get github.com/apache/arrow-go/v18@v18.5.2
# (paste main.go above)
ARROW_CHECKED_MAX_RETAINED_FRAMES=20 go run .
ARROW_CHECKED_MAX_RETAINED_FRAMES=20 is optional but strongly recommended — without it, the retained AssertSize frame for each leak is just (*Buffer).ResizeNoShrink, which doesn't reveal where the un-released state came from. With it, the call site is plain.
Replacing the failAfter with io.Discard returns Leaked: 0 — same code, working sink, no leak.
Expected behavior
CurrentAlloc() == 0 after FileWriter.Close() regardless of whether the underlying writer succeeded or failed. An error from Close() should not strand allocator-tracked buffers.
Actual behavior
Close error: short write
Leaked: 4718592 bytes
LEAK of 131072 bytes FROM
github.com/apache/arrow-go/v18/arrow/memory.(*Buffer).ResizeNoShrink+4f
arrow/memory/buffer.go:143
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).expandBuffer+2c
parquet/internal/encoding/encoder.go:164
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).addIndex
parquet/internal/encoding/encoder.go:226
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).Put+9c
parquet/internal/encoding/encoder.go:301
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*typedDictEncoder[...]).Put
parquet/internal/encoding/typed_encoder.go:145
github.com/apache/arrow-go/v18/parquet/file.(*Int32ColumnChunkWriter).writeValues+ab
parquet/file/column_writer_types.gen.go:183
... (writeBatch / pqarrow / WriteBuffered)
[36 identical leak sites; total 36 × 131072 = 4 718 592 bytes]
The 36 leaks correspond to row groups whose rowGroupWriter.Close was started, errored on column 0's Close(), and never reached column 1's Close() — leaving column 1's dictEncoder.idxBuffer un-released. Production has also seen the same root cause surface as bufferedPageWriter.inMemSink BufferWriter leaks (rooted at serializedPageWriter.WriteDataPage ← bufferedPageWriter.WriteDataPage ← columnWriter.WriteDataPage), which fires when the un-closed column had triggered FallbackToPlain mid-write and accumulated post-fallback page bytes in its inMemSink. The reproducer above shows the dict-encoder surface; both surfaces share the same row-group early-return root cause.
Conditions
The leak fires when, in one row group's close:
- The writer is built with
pqarrow.FileWriter.WriteBuffered (the buffered-row-group path — exercised by every path where all columns of a row group are buffered and flushed together at row-group close).
- The schema has multiple columns and the failing column's
Close is not the last in iteration order.
- The underlying
io.Writer errors during the failing column's bufferedPageWriter.Close write to the sink — early enough that rowGroupWriter.Close returns before reaching subsequent columns.
Defaults are otherwise sufficient — the only WriterProperties knob the reproducer sets is MaxRowGroupLength(20_000) to amplify the number of row groups closed during the run; the bug fires equally with default MaxRowGroupLength (int64.max) on a larger payload.
Suggested fix
Primary — rowGroupWriter.Close: drain every column unconditionally, capture the first error, return it after the loop.
func (rg *rowGroupWriter) Close() error {
if !rg.closed {
rg.closed = true
if err := rg.checkRowsWritten(); err != nil {
return err
}
var firstErr error
for _, wr := range rg.columnWriters {
if wr == nil {
continue
}
if err := wr.Close(); err != nil {
if firstErr == nil {
firstErr = err
}
continue // keep draining the rest
}
rg.bytesWritten += wr.TotalBytesWritten()
rg.compressedBytesWritten += wr.TotalCompressedBytes()
}
rg.columnWriters = nil
if firstErr != nil {
return firstErr
}
rg.metadata.SetNumRows(rg.nrows)
rg.metadata.Finish(rg.bytesWritten, rg.ordinal)
}
return nil
}
This is sufficient to close both leak surfaces in the buffered-row-group path: every column gets Close() called, which calls pager.Close() → bufferedPageWriter.Close, whose own defer buf.Release() then releases the in-memory sink bytes; and the column's own defer (registered during its Close) releases currentEncoder.
Secondary — columnWriter.Close: register the cleanup defer before any fallible call (v18.5.2 lines 579-621).
func (w *columnWriter) Close() (err error) {
if !w.closed {
w.closed = true
defer func() {
w.defLevelSink.Reset(0)
w.repLevelSink.Reset(0)
if w.bitsBuffer != nil {
w.bitsBuffer.Release()
w.bitsBuffer = nil
}
if w.currentEncoder != nil {
w.currentEncoder.Release()
w.currentEncoder = nil
}
}()
if w.hasDict && !w.fallbackToNonDict {
if err = w.WriteDictionaryPage(); err != nil {
return err
}
}
if err = w.FlushBufferedDataPages(); err != nil {
return err
}
// ... unchanged ...
}
return err
}
Currently the cleanup defer is registered after WriteDictionaryPage and FlushBufferedDataPages, both of which can return errors. In the buffered-row-group path those calls only mutate in-memory state and don't fail, so this contributes nothing to the reproducer above — but in the SerialRowGroupWriter path (where each WriteDataPage writes straight to the sink), the same defer-after-fallible-call pattern would strand currentEncoder directly. The nil-check on w.currentEncoder becomes load-bearing because the defer can now fire on paths where currentEncoder was never set.
Component(s)
Parquet
pqarrow.FileWriter.Close() leaks per-column-chunk buffers when the underlying io.Writer fails mid-flush
Describe the bug, including details regarding any error messages, version, and platform.
When
pqarrow.FileWriter.Close()runs against anio.Writerthat has gone broken, it returns the first column-chunk close error and leaves every subsequent column's per-chunk allocator-tracked state stranded. The leak is strictly on the error path: the same code with a working sink returnsCheckedAllocator.CurrentAlloc() == 0.Close()is the writer's only chance to release that state — there is no separateRelease()API.The root cause is in
parquet/file/row_group_writer.go::(*rowGroupWriter).Close(v18.5.2 lines 232-254):When
wr.Close()errors on column N, columns N+1..end never getClose()called, so theircurrentEncoderand (in the buffered-row-group path) theirbufferedPageWriter.inMemSinkare never released. ThebufferedPageWriter.Closebody that does release the in-memory sink (page_writer.go:485) is reached via the per-columnpager.Closecall insidecolumnWriter.Close— so skipping a column's Close() also skips releasing its accumulated page bytes.Versions:
github.com/apache/arrow-go/v18v18.5.2Reproducer
go.modpinsgithub.com/apache/arrow-go/v18 v18.5.2. Two-column schema,WriteBufferedloop, anio.Writerthat returnsio.ErrShortWriteafterfailAtbytes:Run with:
ARROW_CHECKED_MAX_RETAINED_FRAMES=20is optional but strongly recommended — without it, the retainedAssertSizeframe for each leak is just(*Buffer).ResizeNoShrink, which doesn't reveal where the un-released state came from. With it, the call site is plain.Replacing the
failAfterwithio.DiscardreturnsLeaked: 0— same code, working sink, no leak.Expected behavior
CurrentAlloc() == 0afterFileWriter.Close()regardless of whether the underlying writer succeeded or failed. An error fromClose()should not strand allocator-tracked buffers.Actual behavior
The 36 leaks correspond to row groups whose
rowGroupWriter.Closewas started, errored on column 0'sClose(), and never reached column 1'sClose()— leaving column 1'sdictEncoder.idxBufferun-released. Production has also seen the same root cause surface asbufferedPageWriter.inMemSinkBufferWriterleaks (rooted atserializedPageWriter.WriteDataPage←bufferedPageWriter.WriteDataPage←columnWriter.WriteDataPage), which fires when the un-closed column had triggeredFallbackToPlainmid-write and accumulated post-fallback page bytes in itsinMemSink. The reproducer above shows the dict-encoder surface; both surfaces share the same row-group early-return root cause.Conditions
The leak fires when, in one row group's close:
pqarrow.FileWriter.WriteBuffered(the buffered-row-group path — exercised by every path where all columns of a row group are buffered and flushed together at row-group close).Closeis not the last in iteration order.io.Writererrors during the failing column'sbufferedPageWriter.Closewrite to the sink — early enough thatrowGroupWriter.Closereturns before reaching subsequent columns.Defaults are otherwise sufficient — the only
WriterPropertiesknob the reproducer sets isMaxRowGroupLength(20_000)to amplify the number of row groups closed during the run; the bug fires equally with defaultMaxRowGroupLength(int64.max) on a larger payload.Suggested fix
Primary —
rowGroupWriter.Close: drain every column unconditionally, capture the first error, return it after the loop.This is sufficient to close both leak surfaces in the buffered-row-group path: every column gets
Close()called, which callspager.Close()→bufferedPageWriter.Close, whose owndefer buf.Release()then releases the in-memory sink bytes; and the column's own defer (registered during itsClose) releasescurrentEncoder.Secondary —
columnWriter.Close: register the cleanupdeferbefore any fallible call (v18.5.2 lines 579-621).Currently the cleanup
deferis registered afterWriteDictionaryPageandFlushBufferedDataPages, both of which can return errors. In the buffered-row-group path those calls only mutate in-memory state and don't fail, so this contributes nothing to the reproducer above — but in theSerialRowGroupWriterpath (where eachWriteDataPagewrites straight to the sink), the same defer-after-fallible-call pattern would strandcurrentEncoderdirectly. The nil-check onw.currentEncoderbecomes load-bearing because the defer can now fire on paths wherecurrentEncoderwas never set.Component(s)
Parquet