@@ -2,7 +2,8 @@ import { ReactNode } from 'react'
22import Page , { PageProps } from './Page.js'
33import Welcome from './Welcome.js'
44
5- import { DataFrame , asyncRows , rowCache , sortableDataFrame } from 'hightable'
5+ import type { DataFrame , DataFrameEvents , ResolvedValue , UnsortableDataFrame } from 'hightable'
6+ import { createEventTarget , sortableDataFrame } from 'hightable'
67import { icebergListVersions , icebergMetadata , icebergRead } from 'icebird'
78import type { Snapshot , TableMetadata } from 'icebird/src/types.js'
89import { useCallback , useEffect , useState } from 'react'
@@ -11,7 +12,10 @@ import Layout from './Layout.js'
1112const empty : DataFrame = {
1213 header : [ ] ,
1314 numRows : 0 ,
14- rows : ( ) => [ ] ,
15+ eventTarget : createEventTarget < DataFrameEvents > ( ) ,
16+ getRowNumber : ( ) => undefined ,
17+ getCell : ( ) => undefined ,
18+ fetch : ( ) => Promise . resolve ( undefined ) ,
1519}
1620
1721export default function App ( ) : ReactNode {
@@ -56,7 +60,7 @@ export default function App(): ReactNode {
5660 const metadataFileName = `${ version } .metadata.json`
5761 icebergMetadata ( { tableUrl : tableUrl , metadataFileName } ) . then ( ( metadata : TableMetadata ) => {
5862 const df = icebergDataFrame ( tableUrl , metadataFileName , metadata )
59- setPageProps ( { df, metadata, versions, version, setVersion, setError } )
63+ setPageProps ( { df, metadata, versions, version, setVersion, setError : setUnknownError } )
6064 } ) . catch ( setUnknownError )
6165 } , [ tableUrl , versions , version , setUnknownError ] )
6266
@@ -86,13 +90,137 @@ function icebergDataFrame(tableUrl: string, metadataFileName: string, metadata:
8690 const schema = metadata . schemas . find ( s => s [ 'schema-id' ] === currentSchemaId )
8791 if ( ! schema ) throw new Error ( 'Current schema not found in metadata' )
8892 const header = schema . fields . map ( f => f . name )
89- return sortableDataFrame ( rowCache ( {
93+ const eventTarget = createEventTarget < DataFrameEvents > ( )
94+
95+ type CachedValue < T > = {
96+ kind : 'fetched'
97+ value : ResolvedValue < T >
98+ } | {
99+ kind : 'fetching'
100+ } | undefined
101+
102+ const rowNumberCache : CachedValue < number > [ ] = [ ]
103+ const cellCache = new Map < string , CachedValue < unknown > [ ] > ( )
104+ header . forEach ( column => cellCache . set ( column , [ ] ) )
105+
106+ function getRowNumber ( { row } : { row : number } ) : ResolvedValue < number > | undefined {
107+ validateRow ( { row, data : { numRows } } )
108+ const cachedValue = rowNumberCache [ row ]
109+ return cachedValue ?. kind === 'fetched' ? cachedValue . value : undefined
110+ }
111+ function getCell ( { row, column } : { row : number , column : string } ) : ResolvedValue < unknown > | undefined {
112+ validateRow ( { row, data : { numRows } } )
113+ validateColumn ( { column, data : { header } } )
114+ const cachedValue = cellCache . get ( column ) ?. [ row ]
115+ return cachedValue ?. kind === 'fetched' ? cachedValue . value : undefined
116+ }
117+ function isCachedOrFetching ( { row, columns } : { row : number , columns ?: string [ ] } ) : boolean {
118+ return rowNumberCache [ row ] !== undefined && ( ! columns || columns . length === 0 || columns . every ( column => cellCache . get ( column ) ?. [ row ] !== undefined ) )
119+ }
120+
121+ // TODO: fetch by row groups, to avoid fetching row by row when we scroll
122+
123+ const unsortableDataFrame : UnsortableDataFrame = {
90124 header,
91125 numRows,
92- rows ( { start, end } ) {
93- const rows = icebergRead ( { tableUrl, metadataFileName, metadata, rowStart : start , rowEnd : end } )
94- . then ( rows => rows . map ( ( cells , index ) => ( { cells, index : start + index } ) ) )
95- return asyncRows ( rows , end - start , header )
126+ eventTarget,
127+ getRowNumber,
128+ getCell,
129+ async fetch ( { rowStart, rowEnd, columns, signal } ) {
130+ validateFetchParams ( { rowStart, rowEnd, columns, data : { numRows, header } } )
131+ checkSignal ( signal )
132+
133+ const ranges = [ ]
134+ let currentRange : [ number , number ] | undefined = undefined
135+ for ( let row = rowStart ; row < rowEnd ; row ++ ) {
136+ if ( isCachedOrFetching ( { row, columns } ) ) {
137+ if ( currentRange ) {
138+ ranges . push ( currentRange )
139+ currentRange = undefined
140+ }
141+ } else {
142+ if ( ! currentRange ) {
143+ currentRange = [ row , row + 1 ]
144+ } else {
145+ currentRange [ 1 ] = row + 1
146+ }
147+ }
148+ }
149+ if ( currentRange ) {
150+ ranges . push ( currentRange )
151+ }
152+ console . log ( `Fetching rows ${ rowStart } - ${ rowEnd } (${ ranges . length } ranges)` , { ranges, columns, cache : { rowNumberCache, cellCache } } )
153+
154+ const promises = ranges . map ( async ( [ start , end ] ) => {
155+ for ( let row = start ; row < end ; row ++ ) {
156+ rowNumberCache [ row ] = { kind : 'fetching' }
157+ for ( const column of columns ?? [ ] ) {
158+ const array = cellCache . get ( column )
159+ if ( ! array ) throw new Error ( `Column ${ column } not found in cache` )
160+ array [ row ] = { kind : 'fetching' }
161+ }
162+ }
163+
164+ const rows = await icebergRead ( {
165+ tableUrl,
166+ rowStart : start ,
167+ rowEnd : end ,
168+ metadataFileName,
169+ metadata,
170+ } )
171+
172+ const rowsEnd = rows . length + start
173+
174+ for ( const [ i , cells ] of rows . entries ( ) ) {
175+ const row = i + start
176+ rowNumberCache [ row ] = { kind : 'fetched' , value : { value : row } }
177+ for ( const column of columns ?? [ ] ) {
178+ const array = cellCache . get ( column )
179+ if ( ! array ) throw new Error ( `Column ${ column } not found in cache` )
180+ array [ row ] = { kind : 'fetched' , value : { value : cells [ column ] } }
181+ }
182+ }
183+ // Not sure if it's the best way to handle the missing rows
184+ for ( let row = start + rowsEnd ; row < end ; row ++ ) {
185+ rowNumberCache [ row ] = { kind : 'fetched' , value : { value : - 1 } } // Indicating that the row is not available - totally not a perfect idea, but it works for now
186+ for ( const column of columns ?? [ ] ) {
187+ const array = cellCache . get ( column )
188+ if ( ! array ) throw new Error ( `Column ${ column } not found in cache` )
189+ array [ row ] = { kind : 'fetched' , value : { value : undefined } }
190+ }
191+ }
192+
193+ eventTarget . dispatchEvent ( new CustomEvent ( 'resolve' ) )
194+ } )
195+
196+ await Promise . all ( promises )
96197 } ,
97- } ) )
198+
199+ }
200+ return sortableDataFrame ( unsortableDataFrame )
201+
202+ }
203+
204+ function validateFetchParams ( { rowStart, rowEnd, columns, data : { numRows, header } } : { rowStart : number , rowEnd : number , columns ?: string [ ] , data : Pick < DataFrame , 'numRows' | 'header' > } ) : void {
205+ if ( rowStart < 0 || rowEnd > numRows || ! Number . isInteger ( rowStart ) || ! Number . isInteger ( rowEnd ) || rowStart > rowEnd ) {
206+ throw new Error ( `Invalid row range: ${ rowStart } - ${ rowEnd } , numRows: ${ numRows } ` )
207+ }
208+ if ( columns ?. some ( column => ! header . includes ( column ) ) ) {
209+ throw new Error ( `Invalid columns: ${ columns . join ( ', ' ) } . Available columns: ${ header . join ( ', ' ) } ` )
210+ }
211+ }
212+ function validateRow ( { row, data : { numRows } } : { row : number , data : Pick < DataFrame , 'numRows' > } ) : void {
213+ if ( row < 0 || row >= numRows || ! Number . isInteger ( row ) ) {
214+ throw new Error ( `Invalid row index: ${ row } , numRows: ${ numRows } ` )
215+ }
216+ }
217+ function validateColumn ( { column, data : { header } } : { column : string , data : Pick < DataFrame , 'header' > } ) : void {
218+ if ( ! header . includes ( column ) ) {
219+ throw new Error ( `Invalid column: ${ column } . Available columns: ${ header . join ( ', ' ) } ` )
220+ }
221+ }
222+ function checkSignal ( signal ?: AbortSignal ) : void {
223+ if ( signal ?. aborted ) {
224+ throw new DOMException ( 'The operation was aborted.' , 'AbortError' )
225+ }
98226}
0 commit comments