Use Philter with Apache Flink

Apache Flink is a powerful data streaming platform. By using Philter with Apache Flink you can find and remove sensitive information moving through your Apache Flink streaming pipelines. See Philter’s features.

Prerequisites

An instance of Philter is required. You can launch one in the cloud or as a Docker container. If you are in AWS there are CloudFormation and Terraform scripts for launching a single instance of Philter or a load-balanced auto-scaled set of Philter instances.

Integrating Philter with Apache Flink

You can use Philter to find and remove sensitive information from streaming text using Apache Flink. In our example here we are using Apache Flink to consume text from an Apache Kafka topic. The Flink application consumes from the Apache Kafka topic, uses Philter to find and remove sensitive information from the text, and then publishes the filtered text to a different Kafka topic.

The code shown on this page is available as a project on GitHub.

We will be using Philter’s Java SDK to connect to Philter’s REST API from the Flink application. If performance is more important to you please contact us for a native Philter implementation that does not require making REST API calls and can be called directly from the Flink code.

We add the Philter Java SDK dependency to our project:

<dependency>
  <groupId>com.mtnfog</groupId>
  <artifactId>philter-sdk-java</artifactId>
  <version>1.2.0</version>
</dependency>

We will use Flink’s RichMapFunction to create our own map function. Our new class will configure Philter and implement the communication with Philter.

public class FilterMapRestFunction extends RichMapFunction<String, String> {

    private PhilterClient philterClient;
    private String context;
    private String filterProfileName;

    public FilterMapRestFunction(String context, String filterProfileName) {
        this.context = context;
        this.filterProfileName = filterProfileName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        this.philterClient = new PhilterClient.PhilterClientBuilder().withEndpoint("https://127.0.0.1:8080").build();

    }

    @Override
    public String map(String s) throws Exception {

        final FilterResponse filterRepsonse = philterClient.filter(context, "document", filterProfileName, s);

        return filterRepsonse.getFilteredText();

    }
    
    @Override
    public void close() throws Exception {

    }
    
}

In the code above, we are creating a new map function that takes in the raw text and returns the filtered text after making a call to Philter. We are providing the context (an arbitrary string) and the name of the filter profile via the class’s constructor. (You can use “default” as the name of the filter profile if you have not created any custom filter profiles.) When we make the call to Philter we pass “document” as the ID of the document. Like the context, this is an arbitrary string that can be used to identify the document.

Be sure to change the address of the Philter endpoint in the open function.

To complete the Flink application, we will have functions that return a FlinkKafkaConsumer<String> and a function that returns a FlinkKafkaProducer<String>. These functions will configure the connection to the source and destination Kafka topics.

When we build the project and get a runnable jar, we can execute the jar to start the Flink application. Flink will begin consuming from the source Kafka topic, process the text with Philter, and publish the filtered text to the destination Kafka topic.

For the full code see the mtnfog/philter-streaming project on GitHub.