Fluvio SQL Sink connector
The SQL Sink connector reads records from Fluvio topic, applies configured transformations, and
sends new records to the SQL database (via INSERT
statements).
- PostgreSQL
- SQLite
Model | PostgreSQL | SQLite |
---|---|---|
Bool | BOOL | BOOLEAN |
Char | CHAR | INTEGER |
SmallInt | SMALLINT, SMALLSERIAL, INT2 | INTEGER |
Int | INT, SERIAL, INT4 | INTEGER |
BigInt | BIGINT, BIGSERIAL, INT8 | BIGINT, INT8 |
Float | REAL, FLOAT4 | REAL |
DoublePrecision | DOUBLE PRECISION, FLOAT8 | REAL |
Text | VARCHAR, CHAR(N), TEXT, NAME | TEXT |
Bytes | BYTEA | BLOB |
Numeric | NUMERIC | REAL |
Timestamp | TIMESTAMP | DATETIME |
Date | DATE | DATE |
Time | TIME | TIME |
Uuid | UUID | BLOB, TEXT |
Json | JSON, JSONB | TEXT |
The SQL Sink connector expects the data in Fluvio SQL Model in JSON format.
In order to work with different data formats or data structures, transformations
can be applied.
The transformation is a SmartModule pulled from the SmartModule Hub. Transformations are chained according to the order
in the config. If a SmartModule requires configuration, it is passed via with
section of transforms
entry.
Option | default | type | description |
---|---|---|---|
url | - | String | SQL database conection url |
apiVersion: 0.1.0
meta:
version: 0.4.4
name: my-sql-connector
type: sql-sink
topic: sql-topic
create-topic: true
secrets:
- name: DB_USERNAME
- name: DB_PASSWORD
- name: DB_HOST
- name: DB_PORT
- name: DB_NAME
sql:
url: 'postgresql://${{ secrets.DB_USERNAME }}:${{ secrets.DB_PASSWORD }}@${{ secrets.DB_HOST }}:${{ secrets.DB_PORT }}/${{ secrets.DB_NAME }}'
The connector can use secrets in order to hide sensitive information.
apiVersion: 0.1.0
meta:
version: 0.4.4
name: my-sql-connector
type: sql-sink
topic: sql-topic
secrets:
- name: DATABASE_URL
sql:
url: ${{ secrets.DATABASE_URL }}
Fluvio Consumer Offset feature allows for a connector to store the offset in the Fluvio cluster and use it on restart.
To activate it, you need to provide the consumer
name and set the strategy: auto
.
See the example below:
apiVersion: 0.2.0
meta:
version: 0.4.4
name: my-sql-connector
type: sql-sink
topic:
meta:
name: sql-sink-topic
consumer:
id: my-sql-sink
offset:
strategy: auto
start: beginning
flush-period:
secs: 10
nanos: 0
secrets:
- name: DATABASE_URL
sql:
url: ${{ secrets.DATABASE_URL }}
After the connector processed any records, you can check the last stored offset value via:
$ fluvio consumer list
CONSUMER TOPIC PARTITION OFFSET LAST SEEN
my-http-sink http-sink-topic 0 0 3s
Let’s look at the example of the connector with one transformation named infinyon/json-sql. The transformation takes
records in JSON format and creates SQL insert operation to topic_message
table. The value from device.device_id
JSON field will be put to device_id
column and the entire json body to record
column.
The JSON record:
{
"device": {
"device_id": 1
}
}
The SQL database (Postgres):
CREATE TABLE topic_message (device_id int, record json);
Connector configuration file:
# connector-config.yaml
apiVersion: 0.1.0
meta:
version: 0.4.4
name: json-sql-connector
type: sql-sink
topic: sql-topic
create-topic: true
secrets:
- name: DATABASE_URL
sql:
url: ${{ secrets.DATABASE_URL }}
transforms:
- uses: infinyon/json-sql
with:
mapping:
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
You can use Fluvio cdk
tool to deploy the connector:
cdk deploy start --config connector-config.yaml
To delete the connector run:
cdk deploy shutdown --name json-sql-connector
After you run the connector you will see records in your database table.
See more in our Build MQTT to SQL Pipeline and Build HTTP to SQL Pipeline tutorials.
Every step would be same except the connector config and the behavior of the connector after deployment.
We have a operation
parameter which defaults to insert
but we can pass upsert
here to specify we want to do an upsert operation.
Upsert additionaly takes an unique-columns
argument. unique-columns
specifies the list indices or column names to check for uniqueness of a record.
If a record with same value in unique-columns
exists in the database, it will be updated. If no record exists with same value, the given record will
be inserted.
Connector configuration file for upsert (assuming device_id
is a unique column or an index in the database):
# connector-config.yaml
apiVersion: 0.1.0
meta:
version: 0.4.4
name: json-sql-connector
type: sql-sink
topic: sql-topic
create-topic: true
secrets:
- name: DATABASE_URL
sql:
url: ${{ secrets.DATABASE_URL }}
transforms:
- uses: infinyon/json-sql
with:
mapping:
operation: "upsert"
unique-columns:
- "device_id"
table: "topic_message"
map-columns:
"device_id":
json-key: "device.device_id"
value:
type: "int"
default: "0"
required: true
"record":
json-key: "$"
value:
type: "jsonb"
required: true
See more about upsert in our blog.
Note: the blog doesn’t use json-sql
smartmodule and has hardcoded records for demonstration. sql-connector
is intended to be used with json-sql
.