Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 75 additions & 17 deletions website/docs/quickstart/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ mkdir fluss-quickstart-flink
cd fluss-quickstart-flink
```

2. Create a `docker-compose.yml` file with the following content:
2. Create a Dockerfile named `fluss.Dockerfile` as follows. You can adjust the Flink version as needed. Please make sure to download the compatible versions of [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) in the Dockerfile.

```Dockerfile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A docker file looks wried to me. Could we just mount the libs to flink image just like we did for iceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

ARG FLINK_VERSION="1.20"
FROM flink:${FLINK_VERSION}
ARG FLINK_VERSION
RUN wget -P /opt/flink/lib https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar
RUN wget -P /opt/flink/lib https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/$FLUSS_DOCKER_VERSION$/fluss-flink-${FLINK_VERSION}-$FLUSS_DOCKER_VERSION$.jar
```

3. Create a `docker-compose.yml` file with the following content:


```yaml
Expand Down Expand Up @@ -69,7 +79,8 @@ services:
#end
#begin Flink cluster
jobmanager:
image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
build:
dockerfile: ./fluss.Dockerfile
ports:
- "8083:8081"
command: jobmanager
Expand All @@ -78,7 +89,8 @@ services:
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
build:
dockerfile: ./fluss.Dockerfile
depends_on:
- jobmanager
command: taskmanager
Expand All @@ -89,16 +101,23 @@ services:
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.off-heap.size: 256m
sql-client:
build:
dockerfile: ./fluss.Dockerfile
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
#end
```

The Docker Compose environment consists of the following containers:
- **Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server.
- **Flink Cluster**: a Flink `JobManager` and a Flink `TaskManager` container to execute queries.

**Note:** The `apache/fluss-quickstart-flink` image is based on [flink:1.20.1-java17](https://hub.docker.com/layers/library/flink/1.20-java17/images/sha256:bf1af6406c4f4ad8faa46efe2b3d0a0bf811d1034849c42c1e3484712bc83505) and
includes the [fluss-flink](engine-flink/getting-started.md) and
[flink-connector-faker](https://flink-packages.org/packages/flink-faker) to simplify this guide.
- **Flink Cluster**: a Flink `JobManager`, a Flink `TaskManager`, and a Flink SQL client container to execute queries.

3. To start all containers, run:
```shell
Expand All @@ -116,7 +135,6 @@ You can also visit http://localhost:8083/ to see if Flink is running normally.

:::note
- If you want to additionally use an observability stack, follow one of the provided quickstart guides [here](maintenance/observability/quickstart.md) and then continue with this guide.
- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) and then put them to `FLINK_HOME/lib/`.
- All the following commands involving `docker compose` should be executed in the created working directory that contains the `docker-compose.yml` file.
:::

Expand All @@ -125,26 +143,66 @@ Congratulations, you are all set!
## Enter into SQL-Client
First, use the following command to enter the Flink SQL CLI Container:
```shell
docker compose exec jobmanager ./sql-client
docker compose run sql-client
```

**Note**:
To simplify this guide, three temporary tables have been pre-created with `faker` connector to generate data.
You can view their schemas by running the following commands:
To simplify this guide, we will create three temporary tables with `faker` connector to generate data:

```sql title="Flink SQL"
SHOW CREATE TABLE source_customer;
CREATE TEMPORARY TABLE source_order (
`order_key` BIGINT,
`cust_key` INT,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING
) WITH (
'connector' = 'faker',
'rows-per-second' = '10',
'number-of-rows' = '10000',
'fields.order_key.expression' = '#{number.numberBetween ''0'',''100000000''}',
'fields.cust_key.expression' = '#{number.numberBetween ''0'',''20''}',
'fields.total_price.expression' = '#{number.randomDouble ''3'',''1'',''1000''}',
'fields.order_date.expression' = '#{date.past ''100'' ''DAYS''}',
'fields.order_priority.expression' = '#{regexify ''(low|medium|high){1}''}',
'fields.clerk.expression' = '#{regexify ''(Clerk1|Clerk2|Clerk3|Clerk4){1}''}'
);
```

```sql title="Flink SQL"
SHOW CREATE TABLE source_order;
CREATE TEMPORARY TABLE source_customer (
`cust_key` INT,
`name` STRING,
`phone` STRING,
`nation_key` INT NOT NULL,
`acctbal` DECIMAL(15, 2),
`mktsegment` STRING,
PRIMARY KEY (`cust_key`) NOT ENFORCED
) WITH (
'connector' = 'faker',
'number-of-rows' = '200',
'fields.cust_key.expression' = '#{number.numberBetween ''0'',''20''}',
'fields.name.expression' = '#{funnyName.name}',
'fields.nation_key.expression' = '#{number.numberBetween ''1'',''5''}',
'fields.phone.expression' = '#{phoneNumber.cellPhone}',
'fields.acctbal.expression' = '#{number.randomDouble ''3'',''1'',''1000''}',
'fields.mktsegment.expression' = '#{regexify ''(AUTOMOBILE|BUILDING|FURNITURE|MACHINERY|HOUSEHOLD){1}''}'
);
```

```sql title="Flink SQL"
SHOW CREATE TABLE source_nation;
CREATE TEMPORARY TABLE `source_nation` (
`nation_key` INT NOT NULL,
`name` STRING,
PRIMARY KEY (`nation_key`) NOT ENFORCED
) WITH (
'connector' = 'faker',
'number-of-rows' = '100',
'fields.nation_key.expression' = '#{number.numberBetween ''1'',''5''}',
'fields.name.expression' = '#{regexify ''(CANADA|JORDAN|CHINA|UNITED|INDIA){1}''}'
);
```


## Create Fluss Tables
### Create Fluss Catalog
Use the following SQL to create a Fluss catalog:
Expand Down