Apache NiFi for Processing PHI Data

With the recent release of Apache NiFi 1.10.0, it seems like a good time to discuss using Apache NiFi with data containing protected health information (PHI). When PHI is present in data it can present significant concerns and impose many requirements you may not face otherwise due to regulations such as HIPAA.

Apache NiFi probably needs little introduction but in case you are new to it, Apache NiFi is a big-data ETL application that uses directed graphs called data flows to move and transform data. You can think of it as taking data from one place to another while, optionally, doing some transformation to the data. The data goes through the flow in a construct known as a flow file. In this post we'll consider a simple data flow that reads file from a remote SFTP server and uploads the files to S3. We don't need to look at a complex data flow to understand how PHI can impact our setup.

Encryption of Data at Rest and In-motion

Two core things to address when PHI data is present is encryption of the data at rest and encryption of the data in motion. The first step is to identify those places where sensitive data will be at rest and in motion.

For encryption of data at rest, the first location is the remote SFTP server. In this example, let's assume the remote SFTP server is not managed by us, has the appropriate safeguards, and is someone else's responsibility. As the data goes through the NiFi flow, the next place the data is at rest is inside NiFi's provenance repository. (The provenance repository stores the history of all flow files that pass through the data flow.) NiFi then uploads the files to S3. AWS gives us the capability to encrypt S3 bucket contents by default so we will use that through an S3 bucket policy.

For encryption of data in motion, we have the connection between the SFTP server and NiFi and between NiFi and S3. Since we are using an SFTP server, our communication to the SFTP server will be encrypted. Similarly, we will access S3 over HTTPS providing encryption there as well.

If we are using a multi-node NiFi cluster, we may also have the communication between the NiFi nodes in the cluster. If the flows only execute on a single node you may argue that encryption between the nodes is not necessary. However, what happens in the future when the flow's behavior is changed and now PHI data is being transmitted in plain text across a network? For that reason, it's best to set up encryption between NiFi nodes from the start. This is covered in the NiFi System Administrator's Guide.

Encrypting Apache NiFi's Data at Rest

The best way to ensure encryption of data at rest is to use full disk encryption for the NiFi instances. (If you are on AWS and running NiFi on EC2 instances, use an encrypted EBS volume.) This ensures that all data persisted on the system will be encrypted no matter where the data appears. If a NiFi processor decides to have a bad day and dump error data to the log there is a risk of PHI data being included in the log. With full disk encryption we can be sure that even that data is encrypted as well.

Looking at Other Methods

Let's recap the NiFi repositories:

PHI could exist in any of these repositories when PHI data is passing through a NiFi flow. NiFi does have an encrypted provenance repository implementation and NiFi 1.10.0 introduces an experimental encrypted content repository but there are some caveats. (Currently, NiFi does not have an implementation of an encrypted flowfile repository.)

When using these encryption implementations, spillage of PHI onto the file system through a log file or some other means is a risk. There will be a bit of overhead due to the additional CPU instructions to perform the encryption. Comparing usage of the encrypted repositories with using an encrypted EBS volume, we don't have to worry about spilling unencrypted PHI to the disk, and per the AWS EBS encryption documentation, "You can expect the same IOPS performance on encrypted volumes as on unencrypted volumes, with a minimal effect on latency."

There is also the NiFi EncryptContent processor that can encrypt (and decrypt despite the name!) the content of flow files. This processor has use but in very specific cases. Trying to encrypt data at the level of the data flow for compliance reasons is not recommended due to the data possibly existing elsewhere in the NiFi repositories.

Removing PHI from Text in a NiFi Flow

PhilterWhat if you want to remove PHI (and PII) from the content of flow files as they go through a NiFi data flow? Check out our product Philter. It provides the ability to find and remove many types of PHI and PII from natural language, unstructured text from within a NiFi flow. Text containing PHI is sent to Philter and Philter responds with same text but with the PHI and PII removed.

Conclusion

Full disk encryption and encrypting all connections in the NiFi flow and between NiFi nodes provides encryption of data at rest and in motion. It's also recommended that you check with your organization's compliance officer to determine if there are any other requirements imposed by your organization or other relevant regulation prior to deployment. It's best to gather that information up front to avoid rework in the future!

Need more help?

We provide consulting services around AWS and big-data tools like Apache NiFi. Get in touch by sending us a message. We look forward to hearing from you!

A Tool for Every Data Engineer’s Toolbox

Collecting data from edge devices in manufacturing, processing medical records from electronic health systems, and analyzing text all sound like very different problems each requiring unique solutions. While that certainly is true there are some commonalities between each of these tasks. Each task requires a scalable method of data ingestion, predictable performance, and capabilities for management and monitoring. Also typically required in projects like the ones described are the abilities to track data lineage as it moves through the pipeline and the ability to replay data. Now we can start to abstract out the commonalities of the projects and observe that the projects are actually not all that different. In each case, the data is being consumed and ingested to be analyzed or processed.

A tool that satisfies those common requirements would be invaluable to a data engineer. One such tool is Apache NiFi, an application that allows data engineers to create directed graphs of data flows using an intuitive web interface. Through NiFi’s construct called a processor, data can be ingested, manipulated, and persisted. Data and software engineers no longer have to write custom code to implement data pipelines. With Apache NiFi, creating a pipeline is as simple as dragging and dropping processors onto its canvas and applying appropriate configuration.

To help illustrate the capabilities of Apache NiFi, a recent project required translating documents, existing in an Apache Kafka topic, of varying languages into a single language. The pipeline required consuming the documents from the topic, determining the language of each document, and selecting the appropriate translation service. Apache NiFi’s ConsumeKafka processor handled the ingestion of documents, an InvokeHttpProcessor powered the webservice request to determine the document’s source language, and a RouteOnAttribute processor directed the flow based on the document’s language to the appropriate InvokeHttpProcessor that sent the text to a translation service. The resulting translated documents were then persisted to S3.

A few years back, making a pipeline to do this would have likely required writing custom code, whether it was consuming from a queue, communicating with the language translation services, or persisting the results to a remote store. Not writing custom code also usually translates to saving time and money. Apache NiFi is one tool that should definitely exist in each data engineer’s toolbox. Like with any tool, it is important to understand NiFi’s capabilities and limitations. The Apache NiFi User Guide

This article was originally posted to Medium.

Need more help?

We provide consulting services around AWS and big-data tools like Apache NiFi. Get in touch by sending us a message. We look forward to hearing from you!

Some First Steps for a New NiFi Cluster

After installing Apache NiFi there are a few steps you might want to take before making your cluster available for prime time. None of these steps are required so make sure they are appropriate for your use-case before implementing them.

Lowering NiFi's Log File Retention Properties

By default, Apache NiFi's nifi-app.log files are capped at 100 MB per log file and NiFi retains 30 log files. If the maximum is reached that comes out to 3 GB of disk space from nifi-app.log files. That's not a whole lot but in some cases you may need the extra disk space. Or, if an external service is already managing your NiFi log files you don't need them hanging around any longer than necessary. To lower the thresholds open NiFi's conf/logback.xml file. Under the appender configuration for nifi-app.log you will see a maxFileSize and maxHistory values. Lower these values, save the file, and restart NiFi to save disk space on log files. Conversely, if you want to keep more log files just increase those limits.

You can also make changes to the handling of the nifi-user.log and nifi-bootstrap.log files here, too. But those files typically don't grow as fast as the nifi-app.log so they can often be left as-is. Note that in a cluster you will need to make these changes on each node.

Install NiFi as a Service

Having NiFi run as a service allows it to automatically start when the system starts and provides easier access for starting and stopping NiFi. Note that in a cluster you will need to make these changes on each node. To install NiFi as a system service, go to NiFi's bin/ directory and run the following commands (on Ubuntu):

sudo ./nifi.sh install
sudo update-rc.d nifi defaults

You can now control the NiFi service with the commands:

sudo systemctl status nifi
sudo systemctl start nifi
sudo systemctl stop nifi
sudo systemctl restart nifi

If running NiFi in a container add the install commands to your Dockerfile.

Install (and use!) the NiFi Registry

The NiFi Registry provides the ability to put your flows under source control. It has quickly become an invaluable tool for NiFi. The NiFi Registry should be installed outside of your cluster but accessible to your cluster. The NiFi Registry Documentation contains instructions on how to install it, create buckets, and connect it to your NiFi cluster.

By default the NiFi Registry listens on port 18080 so be sure your firewall rules allow for the communication. Remember, you only need a single installation of the NiFi Registry per NiFi cluster. If you are using infrastructure-as-code to deploy your NiFi cluster make sure the scripts to deploy the NiFi Registry are outside the cluster scripts. You don't want the NiFi Registry's lifecycle to be tied to the NiFI cluster's lifecycle. This allows you to create and teardown NiFi clusters without affecting your NiFi Registry. It also allows you to share your NiFi registry between multiple clusters if you need to.

Although using the NiFi Registry is not required to make a data flow in NiFI your life will be much, much easier if you do use the NiFi Registry, especially in an environment where multiple users will be manipulating the data flow.

Need more help?

We provide consulting services around AWS and big-data tools like Apache NiFi. Get in touch by sending us a message. We look forward to hearing from you!

Monitoring Apache NiFi's Logs with AWS CloudWatch

It's inevitable that at some point while running Apache NiFi on a single node or as a cluster you will want to see what's in NiFi's log and maybe even be alerted when certain logged events are found. Maybe you are debugging your own processor or just looking for more insight into your data flow. With the AWS CloudWatch Logs agent we can send NiFi's log files to CloudWatch for aggregation, storage, and alerting.

Creating an IAM Role and Policy

The first thing we will do is install the CloudWatch Logs Agent. (We'll mostly be following this Quick Start.) Because permissions are required to save the logs, we will create a new IAM role for our NiFi instances in EC2. (If your NiFi instances already have an existing role attached you can just edit that role.) After creating a new role, add a new JSON policy to it:

For copy/paste ease, the policy is:

{
   "Version":"2012-10-17",
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "logs:DescribeLogStreams"
         ],
         "Resource":[
            "arn:aws:logs:*:*:*"
         ]
      }
   ]
}

Click the Review Policy and give the policy a name, like cloud-watch-logs, and click Create Policy. This policy can now be attached to an IAM role. Click through and give your role a name, such as nifi-instance-role and click Create Role. Now we can attach this role to our NiFi instances.

Install CloudWatch Logs Agent

Now that our NiFi EC2 instances have access to store the logs in CloudWatch Logs we can install the CloudWatch Logs agent on the instance. Because we are running Ubuntu and not Amazon Linux we'll install the agent manually.

curl https://s3.amazonaws.com/aws-cloudwatch/downloads/latest/awslogs-agent-setup.py -O

sudo python ./awslogs-agent-setup.py --region us-east-1

If it gives you an error that the command python cannot be found you probably don't have python (2) installed. You can quickly install it:

sudo apt-get install python

When prompted for an AWS Access Key ID and AWS Secret Access Key press enter to skip both. If you're instances are running in a region other than us-east-1 enter it now. Press enter to skip the default output format. The next prompt asks the location of the syslog. You can press enter to accept the default of /var/log/syslog for both prompts. For the log stream name I recommend using the EC2 instance id which is the default option. Next, select the log event timestamp format. Again, the first option is recommended to press enter to accept it or make a different selection. Next, the agent asks where to start uploading. The first option will get the whole log file while the second option will just start at the end of the file. For completeness, I recommend the first option so press enter.

When asked if there are more log files to configure press enter for yes. Now we will specific NiFi's application log. Our NiFi is installed at /opt/nifi/ so replace /opt/nifi/ with your NiFi directory in the responses below.

Path of log file to upload: /opt/nifi/logs/nifi-app.log
Destination Log Group Name: /opt/nifi/logs/nifi-app.log
Choose Log Stream name: 1. Use EC2 instance id
Choose Log Event timestamp format: 1. %b %d %H:%M:%S (Dec 31 23:59:59)
Choose initial position of upload: 1. From start of file.

Repeat these steps to add any other log files such as nifi-bootstrap.log and nifi-user.log. For convenience, the relevant contents of my /var/awslogs/etc/awslogs.conf file is below:

datetime_format = %b %d %H:%M:%S
file = /var/log/syslog
buffer_duration = 5000
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /var/log/syslog

[/opt/nifi/logs/nifi-app.log]
datetime_format = %b %d %H:%M:%S
file = /opt/nifi/logs/nifi-app.log
buffer_duration = 5000
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /opt/nifi/logs/nifi-app.log

[/opt/nifi/logs/nifi-bootstrap.log]
datetime_format = %b %d %H:%M:%S
file = /opt/nifi/logs/nifi-bootstrap.log
buffer_duration = 5000
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /opt/nifi/logs/nifi-bootstrap.log

[/opt/nifi/logs/nifi-user.log]
datetime_format = %b %d %H:%M:%S
file = /opt/nifi/logs/nifi-user.log
buffer_duration = 5000
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /opt/nifi/logs/nifi-user.log

After making manual changes to this file be sure to restart the CloudWatch Logs Agent service.

sudo service awslogs restart

With the service configured and restarted it will now be sending logs to CloudWatch Logs.

Checkout the Logs!

Navigating back to the AWS Console and going to CloudWatch we can now see our NiFi logs under the Logs section.

Because we selected the EC2 instance ID as the log_stream_name the logs will be grouped by instance ID. It may be more convenient for you to use a hostname instead of the instance ID.

By having all of our NiFi logs aggregated in a single place we no longer have to SSH into each host to look at the log files!

Create Custom Log Filter

We can also now create custom filters on the logs. For example, to quickly just see any error messages we can create a new Logs Metric Filter with the Filter Pattern ERROR. This will create a metric for lines that contain the filter. If you want the filter to look for something more specific you can adjust the Filter Pattern as needed.

Click the Assign Metric button to continue.

Now we can name our filter and assign it a value. Click Create Filter. Now we have our metric filter!

With this filter we can create alarms to watch for static thresholds or anomalies. For example, if more than two ERROR messages are found in the log in a period of 5 minutes generate an alarm. We can utilize CloudWatch's anomaly detection instead of static values. In this case, CloudWatch will monitor the standard deviation and generate an alarm when the condition threshold is met.

Monitoring for ERROR messages in the log is a useful, even if trivial, example but I think it shows the value in utilizing CloudWatch Logs to capture NiFi's logs and building custom metrics and alarms on them.

Need more help?

We provide consulting services around AWS and big-data tools like Apache NiFi. Get in touch by sending us a message. We look forward to hearing from you!

Monitoring Apache NiFi with Datadog

One of the most common requirements when using Apache NiFi is a means to adequately monitor the NiFi cluster. Insights into a NiFi cluster's use of memory, disk space, CPU, and NiFi-level metrics are crucial to operating and optimizing data flows. NiFi's Reporting Tasks provide the capability to publish metrics to external services.

Datadog is a hosted service for collecting, visualizing, and alerting on metrics. With Apache NiFi's built-in DataDogReportingTask, we can leverage Datadog to monitor our NiFi instances. In this blog post we are running a 3 node NiFi cluster in Amazon EC2. Each node is on its own EC2 instance.

Note that the intention of this blog post is not to promote Datadog but instead to demonstrate one potential platform for monitoring Apache NiFi.

If you don't already have a Datadog account the first thing to do is to create one. Once done, the first thing you can do is install the Datadog Agent on your NiFi hosts. The command will look similar to the following except it will have your API key. In the command below we are installing the Datadog agent on an Ubuntu instance. If you just want to monitor NiFi-level metrics you can skip this step, however, we find the host-level metrics to be valuable as well.

DD_API_KEY=xxxxxxxxxxxxxxx bash -c "$(curl -L https://raw.githubusercontent.com/DataDog/datadog-agent/master/cmd/agent/install_script.sh)"

This command will download and install the Datadog agent on the system. The Datadog service will be automatically started and run automatically upon system start.

Creating a Reporting Task

The next step is to create a DataDogReportingTask in NiFi. In NiFi's Controller Settings under Reporting Tasks, click to add a new Reporting Task and select Datadog. In the Reporting Tasks' settings, enter your Datadog API key and change the other values as desired.

By default, NiFi reporting Tasks run every 5 minutes by default. You can change this period under the Settings tab under the "Run Schedule" if needed. Click Apply to save the reporting task.

The reporting task will now be listed. Click the Play icon to start the reporting task. Apache NiFi will now send metrics to Datadog every 5 minutes (unless you changed the Run Schedule value to a different interval).

Exploring NiFi Metrics in Datadog

We can now go to Datadog and explore the metrics from NiFi. Open the Metrics Explorer and enter "nifi" (without the quotes) and the available NiFi metrics will be displayed. These metrics can be included in graphs and other visuals in Datadog dashboards. (If you're interested, the names of the metrics originate in MetricNames.java.)

Creating a Datadog Dashboard for Apache NiFi

These metrics can be added to Datadog dashboards. By creating a new dashboard, we can add NiFi metrics to it. For example, in the dashboard shown below we added line graphs to show the CPU usage and JVM heap usage for each of the NiFi nodes.

The DataDogReportingTask provides a convenient but powerful method of publishing Apache NiFi metrics. The Datadog dashboards can be configured to provide a comprehensive look into the performance of your Apache NiFi cluster.

What we have shown here is really the tip of the iceberg for making a comprehensive monitoring dashboard. With NiFi's metrics and Datadog's flexibility, how the dashboard is created is completely up to you and your needs.

Need more help?

We provide consulting services around AWS and big-data tools like Apache NiFi. Get in touch by sending us a message. We look forward to hearing from you!
MergeContent

Apache NiFi's MergeContent Processor

The MergeContent processor in Apache NiFi is one of the most useful processors but can also be one of the biggest sources of confusion. The processor (you guessed it!) merges flowfiles together based on a merge strategy. The processor's purpose is straightforward but its properties can be tricky. In this post we describe how it can be used to merge previously split flowfiles together.

In a recent NiFi flow, the flow was being split into separate pipelines. Both pipelines executed independently and when both were complete they were merged back into a single flowfile. The MergeContent will be using Defragment as the Merge Strategy. In MergeContent-speak, the split flowfiles became fragments. There are two fragments and we can refer to them as 0 and 1. Which is which doesn't really matter. What does matter is that the index is unique and less than the fragment count (2).

Merging by Defragment

Using the Defragment merge strategy requires some attributes to be placed on the flowfile. Those attributes are:

  • fragment.identifier - All flowfiles with the same fragment.identifier will be grouped together.
  • fragment.count - The count of flowfile fragments, i.e. how many splits do we have? (All flowfiles having the same fragment.identifier must have the same value for fragment.count.)
  • fragment.index - Identifies the index of the flowfile in the group. All flowfiles in the group must have a unique fragment.index value that is between 0 and the count of fragments.

You can set these attributes using an UpdateAttribute processor. In the screenshot below, our flowfile was previously split into 5 fragments. The common attribute value across each of the fragments is some.id. This UpdateAttribute processor is setting this flowfile as index 0.

UpdateAttribute

With these attributes set, when flowfiles reach the MergeContent processor it will know how to combine them. As flowfiles come into the MergeContent processor the value of the fragment.identifier attribute will be read. The MergeContent will bin the flowfiles based on this attribute. When the count of flowfiles binned equals the fragment.count the flowfiles will be merged together. This means that the MergeContent's Maximum Number of Bins property should be equal to or greater than the number of fragment.identifers processed concurrently (source). So, if your flow has 100 flowfiles with unique fragment.identifier attribute values being processed at any given time you will want to have at least 100 bins.

The other properties of the MergeContent processor are mostly self-explanatory. For example, if you are merging text you can set the Demarcator property to separate the text. The Header and Footer properties allow you to sandwich the combined text with some values.

MergeContent on a Multi-Node NiFi Cluster

It's important to remember that in a distributed NiFi cluster the MergeContent processor requires all fragments to be on the same node. An easy way to catch this is when flowfiles get "stuck" in the transition to the MergeContent processor and their positions are the same. (The same flowfile fragments are both at position N.) This means that one part of the flowfile is on one node and the other part is on the other node, hence the stuck flowfiles. You need to ensure that the upstream flow is putting the flowfiles onto the same NiFi node. One way to do this is to set the load balance strategy to partition and use ${some.id} as the attribute. This will ensure that flowfiles with the same value for the some.id attribute will be routed to the same node. (For more on load balancing check out this blog post from the Apache NiFi project.)

Partition by Attribute

Need more help?

We provide consulting services around AWS and big-data tools like Apache NiFi. Get in touch by sending us a message. We look forward to hearing from you!

Orchestrating NLP Building Blocks with Apache NiFi for Named-Entity Extraction

This blog post shows how we can create an NLP pipeline to perform named-entity extraction on natural language text using our NLP Building Blocks and Apache NiFi. Our NLP Building Blocks provide the ability to perform sentence extraction, string tokenization, and named-entity extraction. They are implemented as microservices and can be deployed almost anywhere, such as AWS, Azure, and as Docker containers.

At the completion of this blog post we will have a system that reads natural language text stored in files on the file system, pulls out the sentences of the each, finds the tokens in each sentence, and finds the named-entities in the tokens.

Apache NiFi is an open-source application that provides data flow capabilities. Using NiFi you can visually define how data should flow through your system. Using what NiFi calls "processors", you can ingest data from many data sources, perform operations on the data such as transformations and aggregations, and then output the data to an external system. We will be using NiFi to facilitate the flow of text through our NLP pipeline. The text will be read from plain text files on the file system. We will then:

  • Identify the sentences in input text.
  • For each sentence, extract the tokens in the sentence.
  • Process the tokens for named-entities.

To get started we will stand up the NLP Building Blocks. This consists of the following applications:

We will launch these applications using a docker-compose script.

git clone https://github.com/mtnfog/nlp-building-blocks
cd nlp-building-blocks
docker-compose up

This will pull the docker images from DockerHub and run the containers. We now have each NLP building block up and running. Let's get Apache NiFi up and running, too.

To get started with Apache NiFi we will download it. It is a big download at just over 1 GB. You can download it from the Apache NiFi Downloads page or directly from a mirror at this link for NiFi 1.4.0. Once the download is done we will unzip the download and start NiFi:

unzip nifi-1.4.0-bin.zip
cd nifi-1.4.0/bin
./nifi.sh start

NiFi will start and after a few minutes it will be available at http://localhost:8080/nifi. (If you are curious you can see the NiFi log under logs/nifi-app.log.) Open your browser to that page and you will see the NiFi canvas as shown below. We can now design our data flow around the NLP Building Blocks!

If you want to skip to the meat and potatoes you can get the NiFi template described below in the nlp-building-blocks repository.

Our source data is going to be read from text files on our computer stored under /tmp/in/. We will use NiFi's GetFile processor to read the file. Add a GetFile processor to the canvas:


Right-click the GetFile processor and click Configure to bring up the processor's properties. The only property we are going to set is the Input Directory property. Set it to /tmp/in/ and click Apply:

We will use the InvokeHTTP processor to send API requests to the NLP Building Blocks, so, add a new InvokeHTTP processor to the canvas:

This first InvokeHTTP processor will be used to send to the data to Prose Sentence Detection Engine to extract the sentences in the text. Open the InvokeHTTP processor's properties and set the following values:

  • HTTP Method - POST
  • Remote URL - http://localhost:7070/api/sentences
  • Content Type - text/plain

Set the processor to autoterminate for everything except Response. We also set the processor's name to ProseSentenceExtractionEngine. Since we will be using multiple InvokeHTTP processors this lets us easily differentiate between them. We can now create a connection between the GetFile and InvokeHTTP processors by clicking and drawing a line between them. Our flow right now reads files from the filesystem and sends the contents to Prose:

The sentences returned from Prose will be in a JSON array. We can split this array into individual FlowFiles with the SplitJson processor. Add a SplitJson processor to the canvas and set its JsonPath Expression property to $.* as shown below:

Connect the SplitJson processor to the ProseSentenceExtractionEngine processor for the Response relationship. The canvas should now look like this:

Now that we have the individual sentences in the text we can send those sentences to Sonnet Tokenization Engine to tokenize the sentences. Similar to before, add an InvokeHTTP processor and name it SonnetTokenizationEngine. Set its method to POST, the Remote URL to http://localhost:9040/api/tokenize, and the Content-Type to text/plain. Automatically terminate every relationship except Response. Connect it to the SplitJson processor using the Split relationship. The result of this processor will be an array of tokens from the input sentence.

While we are at it, let's go ahead and add an InvokeHTTP processor for Idyl E3 Entity Extraction Engine. Add the processor to the canvas and set its name to IdylE3EntityExtractionEngine. Set its properties:

  • HTTP Method - POST
  • Remote URL - http://localhost:9000/api/extract
  • Content-Type - application/json

Connect the IdylE3EntityExtractionEngine processor to the SonnetTokenizationProcessor via the Response relationship. All other relationships can be set to autoterminate. To make things easier to see, we are going to add an UpdateAttribute processor that sets the filename for each FlowFile to a random UUID. Add an UpdateAttribute processor and add a new property called filename with the value ${uuid}.txt. We will also add a processor to write the FlowFiles to disk so we can see what happened during the flow's execution. We will add a PutFile processor and set its Directory property to /tmp/out/.

Our finished flow looks like this:

To test our flow we are going to use a super simple text file. The full contents of the text file are:

George Washington was president. This is another sentence. Martha Washington was first lady.

Save this file as /tmp/in/test.txt.

Now, start up the NLP Building Blocks:

git clone https://github.com/mtnfog/nlp-building-blocks
cd nlp-building-blocks
docker-compose up

Now you can start the processors in the flow! The file /tmp/in/test.txt will disappear and three files will appear in /tmp/out/. The three files will have random UUIDs for filenames thanks to the UpdateAttribute processor. If we look at the contents of each of these files we see:

First file:

{"entities":[{"text":"George Washington","confidence":0.96,"span":{"tokenStart":0,"tokenEnd":2},"type":"person","languageCode":"eng","extractionDate":1514488188929,"metadata":{"x-model-filename":"mtnfog-en-person.bin"}}],"extractionTime":84}

Second file:

{"entities":[],"extractionTime":7}

Third file:

{"entities":[{"text":"Martha Washington","confidence":0.89,"span":{"tokenStart":0,"tokenEnd":2},"type":"person","languageCode":"eng","extractionDate":1514488189026,"metadata":{"x-model-filename":"mtnfog-en-person.bin"}}],"extractionTime":2}

The input text was broken into three sentences so we have three output files. In the first file we see that George Washington was extracted as a person entity. The second file did not have any entities. The third file had Martha Washington as a person entity. Our NLP pipeline orchestrated by Apache NiFi read the input, broke it into sentences, broke each sentence into tokens, and then identified named-entities from the tokens.

This flow assumed the language would always be English but if you are unsure you can add another InvokeHTTP processor to utilize Renku Language Detection Engine. This will enable language detection inside your flow and you can route the FlowFiles through the flow based on the detected language giving you a very powerful NLP pipeline.

There's a lot of cool stuff here but arguably one of the coolest is that by using the NLP Building Blocks you don't have to pay per-request pricing that many of the NLP services charge. You can run this pipeline as much as you need to. And if you are in an environment where your text can't leave your network, this pipeline can be run completely behind a firewall (just like we did in this post).