Change Data Cap­ture (CDC) with NiFi and Snowflake



Snow­flake and Apache NiFi are two dom­in­ant tech­no­lo­gies in today’s data land­scape: the first offers cloud-based, user-friendly data lake­house and ana­lyt­ics cap­ab­il­it­ies, whilst the lat­ter is a de facto open-source stand­ard for data move­ment and stream­ing solu­tions. It’s hardly sur­pris­ing that they can be integ­rated with each other, unlock­ing many poten­tial use cases and form­ing a power­ful com­bin­a­tion for any enter­prise architecture.

We’ve already pos­ted quite a few blog posts about these two tech­no­lo­gies (check them out here!), but in today’s entry, we’ll explore how they can work together. Sim­u­lat­ing a Change Data Cap­ture (CDC) scen­ario, we will use NiFi to ingest new data and copy it to Snow­flake, in real time, as it appears in the ori­ginal source.

Note: the inspir­a­tion behind this blog post came from this You­Tube video by Pierre Vil­lard, dir­ector for Data in Motion at Cloudera.

Snowpipe and NiFi

To achieve this real-time data inges­tion we will use Snowpipe, a server­less Snow­flake ser­vice that allows to load data into a Snow­flake table as soon as it is avail­able in a stage, which is either a loc­a­tion in your Cloud Ser­vice Pro­vider (Google Cloud, Azure or AWS), or a Snow­flake-provided stage attached to your table or user. By cre­at­ing a pipe with a simple COPY state­ment, you can ensure that the data loaded in the stage is auto­mat­ic­ally reflec­ted in the desired table.

Snowpipe architecture
Fig­ure 1: Snowpipe archi­tec­ture (Source: https://www.snowflake.com/wp-content/uploads/2017/11/snowpipe‑1.png)

NiFi’s integ­ra­tion with Snow­flake har­nesses 3 new pro­cessors designed spe­cific­ally to work with Snowpipe:

  • Put­Snow­flakeIn­tern­al­Stage
  • StartS­now­flakeIngest
  • Get­S­now­flakeIngest­Status

These pro­cessors allow us to bypass the com­plex­ity and labour-intens­ive nature of tra­di­tional JDBC con­nec­tions (for example, the clas­sic Put­Data­baseRe­cord pro­cessor), lead­ing to reduced con­fig­ur­a­tion time and increased performance.

We’ll take a proper look at these new pro­cessors when we describe our pipeline in the upcom­ing sections.

Pre­par­ing the Use Case

To build our sim­u­lated scen­ario, we use CSV files con­tain­ing data from a pub­lic data­set (Eth­ereum His­tor­ical Data­set | Kaggle), gen­er­ated from a Python code that we have set up to run every minute. These files are stored in a local dir­ect­ory on our laptop, where NiFi is run­ning, con­stantly check­ing if new files have been cre­ated. When these files are cre­ated, the NiFi pipeline sends them over to Snow­flake, as we will see below.

In Snow­flake, we cre­ated a table where we want this data to be copied:

create or replace TABLE CLEARPEAKS.FINANCE.ETHEREUM (
	"Date" TIMESTAMP_NTZ(9),
	"Symbol" VARCHAR(16777216),
	"Open" FLOAT,
	"High" FLOAT,
	"Low" FLOAT,
	"Close" FLOAT,
	"Volume" FLOAT
);

After cre­at­ing the table, we also cre­ated the pipe to be used by our NiFi inges­tion pipeline by run­ning the fol­low­ing com­mand. Notice how the CSV file details are part of the pipe definition:

create or replace pipe CLEARPEAKS.FINANCE.ETHEREUM_PIPE auto_ingest=false as COPY INTO ETHEREUM FROM @%ETHEREUM FILE_FORMAT = (TYPE = 'CSV', SKIP_HEADER = 1, FIELD_DELIMITER = ',', FIELD_OPTIONALLY_ENCLOSED_BY = '"');

Lastly, we down­loaded the three new Snow­flake nat­ive NiFi pro­cessors (and the required ser­vices) from the fol­low­ing links:

https://mvnrepository.com/artifact/org.apache.NiFi/NiFi-snowflake-processors-nar
https://mvnrepository.com/artifact/org.apache.NiFi/NiFi-snowflake-services-api-nar
https://mvnrepository.com/artifact/org.apache.NiFi/NiFi-snowflake-services-nar

To set them up, we placed these NAR files in the lib dir­ect­ory of the NiFi install­a­tion folder. This loc­a­tion serves as a repos­it­ory for vari­ous lib­rar­ies and exten­sions that NiFi relies on for its oper­a­tions. After restart­ing NiFi, the new pro­cessors and ser­vices are avail­able for us to use.

The NiFi Flow

In the pic­ture below we can see the NiFi flow that we used to run our CDC use case. On a high level, it con­sists primar­ily of two parts:

  • Change Data Cap­ture: The List­File and Fetch­File pro­cessors are a clas­sic combo that allow NiFi to detect and fetch new files as they land in a spe­cific loc­a­tion. This enables us to sim­u­late the case in which a source applic­a­tion is releas­ing new files (in our case, the Eth­ereum data) which we want our NiFi pipeline to ingest in real time. Of course, this is not the only way to imple­ment CDC: we could be using SFTP’s coun­ter­parts (List­SFTP and FetchSFTP), or even Query­Data­ba­seTable, to detect newly added or updated rows in a data­base table. Other hybrid solu­tions are also pos­sible (for example, using Oracle Golden Gate and Kafka).
  • Snowpipe inges­tion: This part is based on the three new pro­cessors that we men­tioned earlier: Put­Snow­flakeIn­tern­al­StageStartS­now­flakeIngest and Get­S­now­flakeIngest­Status. Respect­ively, as their names sug­gest, we use them to put the data in a Snow­flake stage, to trig­ger the start of the related Snowpipe pipe, and then to query the status of the pipe until it completes:
NiFi Flow
Fig­ure 2: NiFi Flow

Let’s take a detailed look at the set­tings of each of these processors.

List­File Processor

The List­File pro­cessor is able to list all the con­tents of a local dir­ect­ory, filtered (if required) by name, type, path, age, or timestamp. By select­ing the Track­ing Timestamps option for the List­ing Strategy para­meter, we make sure that only new files are con­sidered at every exe­cu­tion. We could poten­tially fil­ter by type by chan­ging the regex to include only the files whose names end with .csv, but in our case, we know that noth­ing else will land in that directory.

ListFile processor configuration parameters
Fig­ure 3: List­File pro­cessor con­fig­ur­a­tion parameters
Fetch­File Processor

The Fetch­File pro­cessor works in tan­dem with List­File. Using the incom­ing Flow­Files’ attrib­utes to con­struct the abso­lute path and name of the files to fetch, it accesses the local dir­ect­ory and gen­er­ates a Flow­File for each and every one of those files:

FetchFile processor configuration parameters
Fig­ure 4: Fetch­File pro­cessor con­fig­ur­a­tion parameters

In the image below we can see an example of some files fetched after a few exe­cu­tions of the above pro­cessors, as vis­ible in the Fetch­File out­put queue:

List queue including all CSV files ingested by the FetchFile processor
Fig­ure 5: List queue includ­ing all CSV files inges­ted by the Fetch­File processor

These are the files that we now pass to the new Snow­flake pro­cessors, for inges­tion into our Snow­flake table.

Put­Snow­flakeIn­tern­al­Stage Processor

The Put­Snow­flakeIn­tern­al­Stage pro­cessor makes sure that the incom­ing Flow­Files are copied to a Snow­flake stage. As we can see below, its con­fig­ur­a­tion is very straight­for­ward: all it requires is a con­nec­tion to Snow­flake (provided as a Con­trol­ler Ser­vice), the type of Snow­flake internal stage, and the stage details. In our case, we select the Table type, and thus we provide the name of the Snow­flake data­base, along with the schema and table to which this stage cor­res­ponds.  After run­ning this pro­cessor, our CSV files will be copied to this stage:

PutSnowflakeInternalStage processor configuration parameters
Fig­ure 6: Put­Snow­flakeIn­tern­al­Stage pro­cessor con­fig­ur­a­tion parameters

Note that the Table stage is an internal stage in Snow­flake, so we did not have to cre­ate it (more details avail­able here).

Below you can see the details of the Snow­flake Con­nec­tion Pro­vider, which essen­tially con­sists of a JDCB con­nec­tion string (‘jdbc:snowflake//’ + the account URL) plus the user­name and pass­word to con­nect to it:

Snowflake connection provider details
Fig­ure 7: Snow­flake con­nec­tion pro­vider details
StartS­now­flakeIngest Processor

At this point, our files are in the Snow­flake stage, but these records do not appear in our table yet. To make this hap­pen we need to start the Snowpipe pipe, and the StartS­now­flakeIngest pro­cessor does exactly that. Its con­fig­ur­a­tion requires only an exist­ing Ingest Man­ager Pro­vider Con­trol­ler Service:

StartSnowflakeIngest processor configuration parameters
Fig­ure 8: StartS­now­flakeIngest pro­cessor con­fig­ur­a­tion parameters

The set­tings for the Ingest Man­ager Pro­vider are shown below. We need to input the Snow­flake URL, the user­name, the pipe details (data­base, schema, and pipe name) and a Stand­ardPrivateKey­Ser­vice:

Snowflake ingest manager provider details
Fig­ure 9: Snow­flake ingest man­ager pro­vider details

The Stand­ardPrivateKey­Ser­vice is required to con­nect to the Snow­flake API (which is what the StartS­now­flakeIngest pro­cessor really does under the hood). It requires a key file, or altern­at­ively, a key and a key password:

Private key service details
Fig­ure 10: Private key ser­vice details

To build one, we fol­low the steps provided by Snow­flake at this link. These steps are rel­at­ively straight­for­ward, and at the end if you run the DESC USER com­mand on your user you should see an out­put like this:

Execution of the DESC USER command after generating a Key pair
Fig­ure 11: Exe­cu­tion of the DESC USER com­mand after gen­er­at­ing a Key pair

All we need to do now is to copy the gen­er­ated private key, together with its pass­word, in the Key and Key pass­word para­met­ers of the Private Key Service.

Now that everything is ready, we can start the pro­cessor. How­ever, note that the res­ult of the inges­tion is not imme­di­ately avail­able: with the next pro­cessor, we wait until the pipe has com­pleted its job.

Get­S­now­flakeIngest­Status Processor

The Get­S­now­flakeIngest­Status pro­cessor, as men­tioned above, allows us to check the status of our pipe and to get noti­fied when the inges­tion is com­pleted. As we can see below, all it requires is the Ingest Man­ager Pro­vider cre­ated earlier:

GetSnowflakeIngestStatus processor configuration parameters
Fig­ure 12: Get­S­now­flakeIngest­Status pro­cessor con­fig­ur­a­tion parameters

While it runs, the Flow­Files rep­res­ent­ing the CSV files that are being inges­ted are kept in the retry list queue until the inges­tion is com­pleted and they are sub­sequently moved to the suc­cess list queue; and now the pro­cess is over!

GetSnowflakeIngestStatus processor configuration parameters
Fig­ure 13: Get­S­now­flakeIngest­Status pro­cessor con­fig­ur­a­tion parameters

Query­ing the Data in Snowflake

Once our pipeline has com­pleted, we can go to Snow­flake and run a Select query on our tar­get table to con­firm that the inges­tion was indeed successful:

Result of the Select query on the Snowflake table after the ingestion HAS completed
Fig­ure 14: Res­ult of the Select query on the Snow­flake table after the inges­tion HAS completed

If we keep our NiFi pipeline run­ning, newly cre­ated CSV files are auto­mat­ic­ally added to this table, as our List­File and Fetch­File pro­cessors are detect­ing them when they land in the local dir­ect­ory and then push­ing them down the pipeline to be inges­ted into Snowflake.

Con­clu­sion

In this blog post, we’ve seen how the new NiFi pro­cessors for Snowpipe allow us to eas­ily build a real-time CDC scen­ario where new data is auto­mat­ic­ally and seam­lessly inges­ted into Snow­flake. As we men­tioned, we sim­u­lated an easy case with new files being added to a local dir­ect­ory, but many more (and pos­sibly more artic­u­lated) use cases are pos­sible in NiFi, thanks to its total flex­ib­il­ity and ease of use. It’s also great to see how a mod­ern tool like Snow­flake opens the door to such integ­ra­tions, with ser­vices like Snowpipe along with a power­ful API.

Here at synvert, we’re pas­sion­ate about both tech­no­lo­gies, and our cer­ti­fied, field-tested experts are ready to assist you. Should you require sup­port or wish to delve deeper into the intric­a­cies of NiFi and Snow­flake, please do not hes­it­ate to con­tact us!