Snowflake - Near Real-Time Ingestion from RDBMS using Debezium and Kafka

Karthik Venkatraman
12 min readApr 19, 2021

Overview

This document details the process flow for near-real time ingestion of data from standard RDBMS using log based change data capture into Snowflake. The capabilities explained in this document also go ahead to explain the key Data Engineering concepts within Snowflake specifically around Orchestration of ELT processes once the CDC data lands in Snowflake. This is achieved using the combination of Snowpipe (wrapped under the Kafka-Snowflake connector), streams (CDC like capability within snowflake) and tasks (orchestration process).

The purpose of this document is:

  1. Enable developers to setup a working demo in their environment and get a quick POV done
  2. Explorers of Snowflake to bring out the pure managed power of Snowflake ecosystem

The high level flow of Data is as follows.

Record the steps to configure Snowflake to Kafka and Mysql to Kafka to showcase demo for real-time CDC ingestion

What i need for setting up this end to end pipeline on MAC OS:

  1. Snowflake account & Snowsql installed
  2. Docker, Docker-compose and Git needs to be installed in your machine
  3. Confluent — All-in-one Kafka bundle (steps given below). Confluent provides a pre-built ready to use Kafka docker for demo.
  4. Conduktor — Kafka Desktop Client. This GUI tool is used for browsing and managing Kafka brokers and associated services. This is optional.
  5. Mysql Docker — we will use Debezium pre-configured Mysql

Snowflake Setup (15 mins)

Sign up for a Snowflake trial. Best to sign up for standard edition for this demo.

Generate a public key and private key to enable key based auth. The Kafka connector relies on key pair authentication rather than the typical username/password authentication. This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user defined in the configuration file.

Please refer to the link for more details on how to generate a public key and private key. Suggest to use the encrypted version of public/private key as given in point 3 below

Open unix/linux terminal once more and generate the key as follows

mkdir -p snowflake-real-time/kafka/keys
cd snowflake-real-time/kafka/keys
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8##Note down the passphrase that you enter. It will be required subsequently as wellopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pubcat rsa_key.pub## Copy the public key without the begin and end and proceed to Snowflake UI for user creation with the Public Key as one of the auth mechanism##

Setup Snowflake as given below. Login to Snowflake UI (signup for free trial if you don’t have one) with the URL that is associated with your account. During the demo setup, please use a credentials that has preferably the following privileges:

  1. AccountAdmin
  2. Syadmin
  3. SecurityAdmin

Open a new worksheet in Snowflake UI and execute the below queries.

USE ROLE SYSADMIN;// CREATE A DATABASE FOR THIS DEMO
CREATE OR REPLACE DATABASE KAFKA_DB COMMENT = 'DATABASE FOR KAFKACONNECT DEMO';
// CREATE WAREHOUSE FOR THIS DEMOCREATE OR REPLACE WAREHOUSE KAFKA_WH
WAREHOUSE_SIZE = 'XSMALL'
AUTO_SUSPEND = 30
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD' COMMENT = 'WAREHOUSE FOR KAFKA DEMO';
// CREATE ROLES AND USERS
USE ROLE SECURITYADMIN;
// CREATE ROLE
CREATE OR REPLACE ROLE KAFKA_CONNECTOR_ROLE;
// CREATE USER WITH AUTH MECHANISM BEING BOTH USER/PASSWORD AND KEY BASED AUTHENTICATION
CREATE OR REPLACE USER KAFKA_DEMO
RSA_PUBLIC_KEY = '<PUBLIC KEY AS GENERATED FROM PREVIOUS STEP>'
PASSWORD = '<ANY PLAIN TEXT PASSWORD>'
DEFAULT_ROLE = "KAFKA_CONNECTOR_ROLE"
DEFAULT_WAREHOUSE = 'KAFKA_WH'
DEFAULT_NAMESPACE = 'KAFKA_DEMO.PUBLIC'
MUST_CHANGE_PASSWORD = FALSE;
// GRANT ROLE TO USER
GRANT ROLE KAFKA_CONNECTOR_ROLE TO USER KAFKA_DEMO;
// CHANGE ROLE TO GRANT PRIVILEGES
USE ROLE SYSADMIN;
// GRANT PRIVILEGES ON OBJECT TO ROLE
// AT THIS TIME FOR THE DEMO WE ARE PROVIDING ALL PRIVILEGES
GRANT ALL ON WAREHOUSE KAFKA_WH TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL ON DATABASE KAFKA_DB TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL ON SCHEMA KAFKA_DB.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL ON FUTURE TABLES IN SCHEMA KAFKA_DB.PUBLIC TO KAFKA_CONNECTOR_ROLE;
GRANT CREATE TABLE ON SCHEMA KAFKA_DB.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE STAGE ON SCHEMA KAFKA_DB.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE PIPE ON SCHEMA KAFKA_DB.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE;
// CHANGE ROLE TO PROVIDE KAFKA ROLE TO EXECUTE TASKS
USE ROLE ACCOUNTADMIN;
// EXECUTE ON TASKS
GRANT EXECUTE TASK ON ACCOUNT TO ROLE KAFKA_CONNECTOR_ROLE;

LOGOUT as current user and Login again to Snowflake with the user KAFKA_DEMO created above. Create Tables and Objects using the current role as given below

USE ROLE KAFKA_CONNECTOR_ROLE;
USE SCHEMA KAFKA_DB.PUBLIC;
USE WAREHOUSE KAFKA_WH;
// CREATE TABLES FOR CAPTURING THE CDC DATA THAT WILL BE PUSHED
// LATER FROM MYSQL. NOTE THAT KAFKA CONNECTOR CAN CREATE THE TABLE
// AUTOMATICALLY AS WELL IF YOU DONT WANT THE HASSLE OF CREATING
// TABLE PER TOPIC
// CREATE A RAW LANDING TABLE FOR KAFKA TOPIC. I HAVE ADDED COUPLE
// OF ADDITIONAL COLUMNS FOR THIS DEMO
CREATE OR REPLACE TABLE MYSQL_INVENTORY_CUSTOMERS (
RECORD_METADATA VARIANT,
RECORD_CONTENT VARIANT,
REC_NUM NUMBER(38,0) IDENTITY,
REC_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// CREATE A STAGING TABLE - THIS MAY MIMIC YOUR SOURCE TABLE
// STRUCTURE
CREATE OR REPLACE TABLE DIM_CUSTOMER (
CUSTOMER_ID INT NOT NULL ,
CUSTOMER_FIRST_NAME VARCHAR(255) NOT NULL,
CUSTOMER_LAST_NAME VARCHAR(255) NOT NULL,
CUSTOMER_EMAIL_ID VARCHAR(255) NOT NULL,
CUSTOMER_IS_ACTIVE BOOLEAN NOT NULL DEFAULT TRUE
);

Create Streams and Task to orchestrate the data flow. Refer to this article to understand what are Streams and Tasks.

// CAPTURE INCREMENTAL DATA USING STREAM
CREATE OR REPLACE STREAM STRM_MYSQL_INVENTORY_CUSTOMERS ON TABLE MYSQL_INVENTORY_CUSTOMERS;
// CREATE A TASK TO ORCHESTRATE THE ELT PROCESS
CREATE OR REPLACE TASK TASK_SAVE_STRM_MYSQL_INVENTORY_CUSTOMERS
WAREHOUSE = KAFKA_WH
SCHEDULE = '1 MINUTE'
WHEN
SYSTEM$STREAM_HAS_DATA('STRM_MYSQL_INVENTORY_CUSTOMERS')
AS
CREATE OR REPLACE TRANSIENT TABLE TRAN_DIM_CUSTOMER AS
SELECT
RECORD_CONTENT:op::STRING AS DB_CHANGE_FLAG,
RECORD_CONTENT:after.first_name::STRING AS CUSTOMER_FIRST_NAME,
RECORD_CONTENT:after.last_name::STRING AS CUSTOMER_LAST_NAME,
CASE
WHEN RECORD_CONTENT:op::STRING = 'd' THEN RECORD_CONTENT:before.id::INT
ELSE RECORD_CONTENT:after.id::INT
END AS CUSTOMER_ID,
RECORD_CONTENT:after.email::STRING AS CUSTOMER_EMAIL_ID
FROM STRM_MYSQL_INVENTORY_CUSTOMERS WHERE RECORD_CONTENT:op::STRING IS NOT NULL;
// CREATE A DEPENDENT TASK TO BUILD A TAST TREE
CREATE OR REPLACE TASK TASK_INSERT_MYSQL_INVENTORY_CUSTOMERS
WAREHOUSE = KAFKA_WH
AFTER TASK_SAVE_STRM_MYSQL_INVENTORY_CUSTOMERS
AS
INSERT INTO DIM_CUSTOMER (CUSTOMER_ID, CUSTOMER_FIRST_NAME, CUSTOMER_LAST_NAME, CUSTOMER_EMAIL_ID)
SELECT CC.CUSTOMER_ID, CC.CUSTOMER_FIRST_NAME, CC.CUSTOMER_LAST_NAME, CC.CUSTOMER_EMAIL_ID FROM TRAN_DIM_CUSTOMER CC WHERE DB_CHANGE_FLAG = 'c';
// CREATE A DEPENDENT TASK TO BUILD A TAST TREE
CREATE OR REPLACE TASK TASK_UPDATE_MYSQL_INVENTORY_CUSTOMERS
WAREHOUSE = KAFKA_WH
AFTER TASK_INSERT_MYSQL_INVENTORY_CUSTOMERS
AS
MERGE INTO DIM_CUSTOMER DC
USING TRAN_DIM_CUSTOMER TDC ON DC.CUSTOMER_ID = TDC.CUSTOMER_ID
WHEN MATCHED AND DB_CHANGE_FLAG = 'u' THEN UPDATE SET DC.CUSTOMER_FIRST_NAME = TDC.CUSTOMER_FIRST_NAME, DC.CUSTOMER_LAST_NAME = TDC.CUSTOMER_LAST_NAME, DC.CUSTOMER_EMAIL_ID = TDC.CUSTOMER_EMAIL_ID
WHEN MATCHED AND DB_CHANGE_FLAG = 'd' THEN UPDATE SET CUSTOMER_IS_ACTIVE=FALSE;
// ENABLE THE TASK
ALTER TASK TASK_UPDATE_MYSQL_INVENTORY_CUSTOMERS RESUME;
ALTER TASK TASK_INSERT_MYSQL_INVENTORY_CUSTOMERS RESUME;
ALTER TASK TASK_SAVE_STRM_MYSQL_INVENTORY_CUSTOMERS RESUME;
// CHECK TASK EXECUTION TO MAKE SURE THERE ARE NO ERRORS SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START=>DATEADD('HOUR',-1,CURRENT_TIMESTAMP()),
RESULT_LIMIT => 10));

Go back to the LINUX/UNIX terminal and test the public key/private key to ensure things are set up correctly. We will be using Snowsql. Steps to install and configure Snowsql can be found here.

// PRIVATE KEY PATH IS THE PATH WHERE YOU HAD GENERATED THE KEYS
// PREVIOUSLY
snowsql -a <account_name> -u kafka_demo --private-key-path <BASE PATH>/snowflake-real-time/kafka/keys/rsa_key.p8
// ONCE LOGGED INTO SNOWSQL PLEASE CHECK THE FOLLOWING:
SELECT CURRENT_DATABASE(); -- IT SHOULD BE KAFKA_DB
SELECT CURRENT_SCHEMA(); -- IT SHOULD BE PUBLIC
SELECT CURRENT_ROLE(); -- IT SHOULD BE KAFKA_CONNECTOR_ROLE

Docker, Docker Compose and Git(5 mins)

The below steps have been taken up from Confluent quickstart page under pre-requisite section

  1. Docker version 1.11 or later is installed and running. Refer to the link for sample
  2. Install Docker-compose as well. For mac it is installed by default
  3. Install Git
  4. Docker memory is allocated minimally at 8 GB. When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. You can change the default allocation to 8 GB in Docker. Navigate to Preferences > Resources > Advanced

Installing Confluent Kafka using Docker(15 mins)

For this demo, the intention is to have a Kafka setup up and running. The easiest way to try out is to use a pre-package docker provided by Confluent and the same has been used for this demo:

Open a terminal and go to a directory in your local file system where you may want to organize the artifacts to be used

## go to the folder created earlier - come out of keys foldercd <BASE PATH>/snowflake-real-time/kafka

Clone the confluentinc/cp-all-in-one GitHub repository

git clone https://github.com/confluentinc/cp-all-in-one.gitcd cp-all-in-one/cp-all-in-one

Fire up the confluent kafka from the same directory. There should be a docker-compose.yml already present in the directory

docker-compose up -d

The result should be as follows:

Creating network "cp-all-in-one_default" with the default driver
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating rest-proxy ... done
Creating connect ... done
Creating ksql-datagen ... done
Creating ksqldb-server ... done
Creating control-center ... done
Creating ksqldb-cli ... done

Make a note of the network that has been created above. This is important to connect other dockers that we will set up subsequently. You can start using the below command to verify if Kafka is running:

docker ps -a

Open a browser in your local system and fire: http://localhost:9021/clusters. If you see the cluster management page, then you are all good. Replace localhost with IP/hostname if you did the steps in some other machine. Contact your administrator to figure out any issue at this point.

Testing Kafka using Conduktor(15 mins) — Optional

  1. Install Conduktor as the Kafka client
  2. We will be using the free version of Conduktor
  3. Open Conduktor app after all registration and Add Clusters
  4. Give the cluster name (anything you want)
  5. Enter the bootstrap server -> localhost:9092 (use the IP/hostname if on any other machine)
  6. Hit the test kafka connectivity

Play around on your own to explore how to create topic, produce messages to topic and consume messages from topic.

Snowflake connector installation for Kafka (15 mins)

Snowflake-Kafka connector is a Kafka sink connector and is designed to run in a Kafka Connect cluster to read data from Kafka topics and write the data into Snowflake tables in an automated fashion without any engineering effort. Read more about the same in the Snowflake Documentation

Snowflake provides two versions of the connector:

  • A version for the Confluent package version of Kafka — We will be using this one for the demo setup
  • A version for the open source software (OSS) Apache Kafka package

Download the latest connector published by Snowflake from Maven repo. I am using 1.5.2 while writing this book.

  • The Snowflake JAR file does not require any additional dependencies to use an unencrypted private key for key pair authentication. To use an encrypted private key, download the Bouncy Castle cryptography library (a JAR file). Snowflake uses Bouncy Castle to decrypt encrypted RSA private keys used to log in. Refer to Kafka Connector Snowflake Documentation for more information
  • Since Debezium now sends data in Avro format, we download the Avro converter as well

I am downloading the jars directly into the docker file to avoid downloading to local and then copy.

In the terminal paste the following:

## Go into the docker shell of KAFKA CONNECT with root user 
docker exec -u 0 -it connect bash
## You should be inside docker shell as its root user
cd /usr/share/java
mkdir kafka-connect-snowflake
cd kafka-connect-snowflake
## Download the files directly to the directory
curl "https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.4/snowflake-kafka-connector-1.5.4.jar" --output snowflake-kafka-connector-1.5.4.jar
curl "https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.4/snowflake-jdbc-3.13.4.jar" --output snowflake-jdbc-3.13.4.jarcurl "https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar" --output bc-fips-1.0.1.jarcurl "https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar" --output bcpkix-fips-1.0.3.jarcurl "https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar" --output avro-1.10.2.jar

You should see the following jars in the directory within the connect docker.

To return to your local prompt from the connect docker type

exit

The connector is now installed. All that is left is to bounce the Connect docker from the local prompt. Make sure you have exited the docker Connect prompt

docker stop connectdocker start connect

If all went smooth you should be now able to see the Snowflake connector installed by logging into Conduktor app as follows by clicking Kaka Connect and then hitting on Create button as as shown below in subsequent step:

You should see the connector listed in the 2nd tab (Kafka to DB arrow)

Debezium Mysql CDC Setup (15 mins)

Install Mysql to Kafka cdc connector based on debezium. Pull and install the debezium configured Mysql Docker from the Docker Hub. Follow the steps for docker as mentioned earlier.

Pull the pre-configured mysql docker published by Debezium. Paste the below command in the LINUX/UNIX terminal:

docker pull debezium/example-mysql

start up the mysql docker

docker run -d  --net=cp-all-in-one_default --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=mysql -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqluser debezium/example-mysql

The Mysql docker instance has already got all the configuration for Debezium hooked up into Mysql and capturing data from mysql bin.log. Configure the debezium source connector on Kakfa-connect docker container

docker exec -u 0 -it connect bash## once inside the container execute the following
cd /usr/share/java/
mkdir debezium-connector-mysql
cd debezium-connector-mysql
## get the latest link archive file from here
## replace the link in the curl command as needed
curl "https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.0.Final/debezium-connector-mysql-1.5.0.Final-plugin.tar.gz" --output debezium-connector-mysql-1.5.0.Final-plugin.tar.gztar -xvf debezium-connector-mysql-1.5.0.Final-plugin.tar.gzexit

once exited - restart the connect docker

docker stop connect
docker start connect

Connect all the dots (15–20 mins)

Create a kafka-connect config json for debezium as shown below:

  1. Modify the hostname -> it is the container id of the mysql docker
  2. Add as many tables you want by adding in the property database.include.list
{
"name" : "debez_mysql_inventory",
"config" : {
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.history.kafka.bootstrap.servers" : "broker:29092",
"database.history.kafka.topic" : "schema-changes.inventory",
"database.hostname" : "<MYSQL DOCKER CONTAINER ID>",
"database.include.list" : "inventory",
"database.password" : "mysql",
"database.port" : "3306",
"database.server.id" : "184054",
"database.server.name" : "mysql_cdc_demo",
"database.user" : "root",
"name" : "mysql_inventory_connector",
"snapshot.mode": "schema_only",
"tasks.max" : "1"
}
}

Open Conduktor app and create a new Kafka Connect. Import the JSON as created above. Copy the json file and paste into Conducktor create Kakfa-Connect UI as follows

Create a connector config between Kafka Topic and Snowflake as given below

{
"name" : "sf_mysql_customers",
"config" : {
"buffer.count.records" : "100",
"buffer.flush.time" : "10",
"buffer.size.bytes" : "1048576",
"connector.class" : "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"name" : "sf_mysql_customers",
"snowflake.database.name" : "kafka_db",
"snowflake.private.key" : "<private key in a single line> as generated in step1",
"snowflake.private.key.passphrase" : "<Passphrase with the private key as was used>",
"snowflake.schema.name" : "public",
"snowflake.topic2table.map" : "mysql_cdc_demo.inventory.customers:mysql_inventory_customers",
"snowflake.url.name" : "<URL with account and cloud provider>.snowflakecomputing.com",
"snowflake.user.name" : "kafka_demo",
"topics" : "mysql_cdc_demo.inventory.customers",
"value.converter" : "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081"
}
}

If any error is thrown during creation, I suggest to run the following create user command once again in Snowflake and test it out

CREATE OR REPLACE USER KAFKA_DEMO 
RSA_PUBLIC_KEY = '<PUBLIC KEY AS GENERATED FROM PREVIOUS STEP>'
PASSWORD = '<ANY PLAIN TEXT PASSWORD>'
DEFAULT_ROLE = "KAFKA_CONNECTOR_ROLE"
DEFAULT_WAREHOUSE = 'KAFKA_WH'
DEFAULT_NAMESPACE = 'KAFKA_DEMO.PUBLIC'
MUST_CHANGE_PASSWORD = FALSE;
// GRANT ROLE TO USER
GRANT ROLE KAFKA_CONNECTOR_ROLE TO USER KAFKA_DEMO;

Details of the parameters can be found in the link. Please change the values for private.key and passphrase in the above json snippet. Import the JSON as created above via the conduktor app as mentioned in the steps before.

If no errors are seen, then voila!!! We are done with Kafka to Snowflake.

Testing the Whole Process and see the ELT results

Open a MySql client like DBeaver and connect to the inventory database. Execute the following queries in the inventory schema:

insert into customers (first_name , last_name , email ) 
select 'ABC', 'v',concat('abc@',LEFT(UUID(), 8)) ;
insert into customers (first_name , last_name , email )
select 'DEF', 'v',concat('def@',LEFT(UUID(), 8)) ;
insert into customers (first_name , last_name , email )
select 'XYZ', 'v',concat('xyz@',LEFT(UUID(), 8)) ;
UPDATE inventory.customers
SET last_name='Fernandes',email='Georgi.Fernandes@abc.co.in',first_name='George'
WHERE first_name ='ABC' ;
UPDATE inventory.customers
SET last_name='Simmele',email='Bezalel.Simmele@def.co.in'
WHERE first_name ='DEF' ;
UPDATE inventory.customers
SET last_name='Kumar',email='Suresh.Kumar@xyz.co.in',first_name='Suresh'
WHERE first_name ='XYZ' ;
insert into customers (first_name , last_name , email )
select 'DEL', 'v',concat('del@',LEFT(UUID(), 8)) ;
delete from customers WHERE first_name ='DEL' ;

Wait a min and you should be able to see data flowing to dim_customer table in Snowflake. Login to Snowflake UI and fire the the below queries:

// You should see raw data flowing to the below table
select * from MYSQL_INVENTORY_CUSTOMERS;
// You should see processed data in the below table
select * from dim_customer;

Thats all folks!!! You should now be able to see data flowing into Snowflake as you insert/update more records in Mysql continously.

Troubleshooting tips

If you feel data is not flowing into Snowflake you may want to check the connect docker logs by using

docker logs connect -f 

If no errors are seen in connect, then you may check roles and access. Most likely roles and access could be the issue

If the processed data is not available in the dim_customer table then check the task execution history as given below

// CHECK TASK EXECUTION TO MAKE SURE THERE ARE NO ERRORSSELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START=>DATEADD('HOUR',-1,CURRENT_TIMESTAMP()),
RESULT_LIMIT => 10));

--

--