SQL

Fluvio SQL Sink connector

The SQL Sink connector reads records from Fluvio topic, applies configured transformations, and sends new records to the SQL database (via INSERT statements).

 

Supported databases

  1. PostgreSQL
  2. SQLite
 

Data types

Model PostgreSQL SQLite
Bool BOOL BOOLEAN
Char CHAR INTEGER
SmallInt SMALLINT, SMALLSERIAL, INT2 INTEGER
Int INT, SERIAL, INT4 INTEGER
BigInt BIGINT, BIGSERIAL, INT8 BIGINT, INT8
Float REAL, FLOAT4 REAL
DoublePrecision DOUBLE PRECISION, FLOAT8 REAL
Text VARCHAR, CHAR(N), TEXT, NAME TEXT
Bytes BYTEA BLOB
Numeric NUMERIC REAL
Timestamp TIMESTAMP DATETIME
Date DATE DATE
Time TIME TIME
Uuid UUID BLOB, TEXT
Json JSON, JSONB TEXT
 

Transformations

The SQL Sink connector expects the data in Fluvio SQL Model in JSON format. In order to work with different data formats or data structures, transformations can be applied. The transformation is a SmartModule pulled from the SmartModule Hub. Transformations are chained according to the order in the config. If a SmartModule requires configuration, it is passed via with section of transforms entry.

 

Configuration

Option default type description
url - String SQL database conection url
 

Basic example:

apiVersion: 0.1.0
meta:
  version: 0.4.4
  name: my-sql-connector
  type: sql-sink
  topic: sql-topic
  create-topic: true
  secrets:
    - name: DB_USERNAME
    - name: DB_PASSWORD
    - name: DB_HOST
    - name: DB_PORT
    - name: DB_NAME
sql:
  url: 'postgresql://${{ secrets.DB_USERNAME }}:${{ secrets.DB_PASSWORD }}@${{ secrets.DB_HOST }}:${{ secrets.DB_PORT }}/${{ secrets.DB_NAME }}'
 

Secrets

The connector can use secrets in order to hide sensitive information.

apiVersion: 0.1.0
meta:
  version: 0.4.4
  name: my-sql-connector
  type: sql-sink
  topic: sql-topic
  secrets:
    - name: DATABASE_URL
sql:
  url: ${{ secrets.DATABASE_URL }}
 

Offset Management

Fluvio Consumer Offset feature allows for a connector to store the offset in the Fluvio cluster and use it on restart.
To activate it, you need to provide the consumer name and set the strategy: auto.
See the example below:

apiVersion: 0.2.0
meta:
  version: 0.4.4
  name: my-sql-connector
  type: sql-sink
  topic:
    meta:
      name: sql-sink-topic
  consumer:
    id: my-sql-sink
    offset:
      strategy: auto
      start: beginning
      flush-period:
        secs: 10
        nanos: 0
  secrets:
    - name: DATABASE_URL
sql:
  url: ${{ secrets.DATABASE_URL }}

After the connector processed any records, you can check the last stored offset value via:

$ fluvio consumer list
  CONSUMER      TOPIC            PARTITION  OFFSET  LAST SEEN
  my-http-sink  http-sink-topic  0          0       3s
 

Insert Usage Example

Let’s look at the example of the connector with one transformation named infinyon/json-sql. The transformation takes records in JSON format and creates SQL insert operation to topic_message table. The value from device.device_id JSON field will be put to device_id column and the entire json body to record column.

The JSON record:

{
  "device": {
    "device_id": 1
  }
}

The SQL database (Postgres):

CREATE TABLE topic_message (device_id int, record json);

Connector configuration file:

# connector-config.yaml
apiVersion: 0.1.0
meta:
  version: 0.4.4
  name: json-sql-connector
  type: sql-sink
  topic: sql-topic
  create-topic: true
  secrets:
    - name: DATABASE_URL
sql:
  url: ${{ secrets.DATABASE_URL }}
transforms:
  - uses: infinyon/json-sql
    with:
      mapping:
        table: "topic_message"
        map-columns:
          "device_id":
            json-key: "device.device_id"
            value:
              type: "int"
              default: "0"
              required: true
          "record":
            json-key: "$"
            value:
              type: "jsonb"
              required: true

You can use Fluvio cdk tool to deploy the connector:

cdk deploy start --config connector-config.yaml

To delete the connector run:

cdk deploy shutdown --name json-sql-connector

After you run the connector you will see records in your database table.

See more in our Build MQTT to SQL Pipeline and Build HTTP to SQL Pipeline tutorials.

 

Upsert Usage Example

Every step would be same except the connector config and the behavior of the connector after deployment.

We have a operation parameter which defaults to insert but we can pass upsert here to specify we want to do an upsert operation.

Upsert additionaly takes an unique-columns argument. unique-columns specifies the list indices or column names to check for uniqueness of a record. If a record with same value in unique-columns exists in the database, it will be updated. If no record exists with same value, the given record will be inserted.

Connector configuration file for upsert (assuming device_id is a unique column or an index in the database):

# connector-config.yaml
apiVersion: 0.1.0
meta:
  version: 0.4.4
  name: json-sql-connector
  type: sql-sink
  topic: sql-topic
  create-topic: true
  secrets:
    - name: DATABASE_URL
sql:
  url: ${{ secrets.DATABASE_URL }}
transforms:
  - uses: infinyon/json-sql
    with:
      mapping:
        operation: "upsert"
        unique-columns:
          - "device_id"
        table: "topic_message"
        map-columns:
          "device_id":
            json-key: "device.device_id"
            value:
              type: "int"
              default: "0"
              required: true
          "record":
            json-key: "$"
            value:
              type: "jsonb"
              required: true

See more about upsert in our blog. Note: the blog doesn’t use json-sql smartmodule and has hardcoded records for demonstration. sql-connector is intended to be used with json-sql.