Sunday, June 4, 2017

Configuring Streamsets Data Collector with Hashicorp Vault

     I've been using Streamsets Data Collector a lot lately in my work, and I'm really impressed with it. It has a really nice UI and lots of components that come out of the box with the product. By virtue of the name, this product was built for streaming data, i.e. you turn on a pipeline and let it just run. Now that they have included the Pipeline Finisher Executor in version 2.5.0, being able to load data from RDBMs in batch has become much easier. 

    Since I use this product to connect to various source data systems, and since these pipelines can be exported to JSON files, the potential to have plaintext usernames/passwords laying around is present. Luckily, Data Collector has native integration with Hashicorp Vault, where we can store our source system usernames and passwords as "secrets". I'm going to walk through, step by step, how to configure and use Vault with Cloudera's Hadoop distribution and Streamsets. 

  1. Download binary from here and make sure to grab the 64 bit version for Linux
  2. Unzip the file to /usr/vault
  3. Create a symbolic link called vault in /usr/bin that points to /usr/vault/vault
  4. Allow the file to be executed: chmod u+x /usr/vault/vault
  5. Create a directory to store the config: mkdir /usr/vault/config.d
  6.       Now we need to create a "Secret Backend" a.k.a. storage for the "secrets". In this example we will mount a backend in zookeeper. By using zookeeper we can take advantage of the high availability that comes with it and not have to worry about building out a HA solution using the one of the other available backends.  In the /usr/vault/config.d  directory make a file called config.hcl with the following contents in the Hashicorp Configuration Language:

     backend "zookeeper" {
     address = "<zookeepernode1>:2181,<zookeepernode2>:2181,<zookeepernode3>:2181"
     path = "vault/"
     redirect_addr ="http://<vaultserver>:8200"
     listener "tcp" {
     address = "<vaultserver>:8200"
     tls_disable = 1
       Put in all of your zookeeper nodes separated  by a comma, as well as the name of              the server that is going to run Vault. In this example we are not going to cover TLS,           so I have it disabled
 7.  Now we can start up Vault: vault server -config=/usr/vault/config.d/config.hcl
       8.  Hit Ctrl + z then enter bg to have Vault run in the background
       9.  Next we initialize vault and get the keys (we are using a share of 3 and a threshold               of 3. This means you need all 3 keys to unseal the vault in this example) 

      vault init -key-shares=3 -key-threshold=3  -address=http://<vaultserver>:8200

        which produced:

         Unseal Key 1: <key1>
      Unseal Key 2: <key2>
      Unseal Key 3: <key3>
      Initial Root Token: <roottoken>

         Make sure to store these keys/token somewhere safe, because you won't be able to              get these back again

  10. Since we aren't using TLS, we need to overwrite an environment variable so Vault               won't try to use https anymore: export VAULT_ADDR='http://<vaultserver>:8200'
 11. You now have the master keys to the vault as well as an initial root token. Use the             master key(s) to "unseal" the vault so we can start using it. Here are the commands            using the REST API that will do this. Until all 3 keys are entered, the vault will                       remain sealed:

curl \
    -X PUT \
    -d '{"key": "<key1>"}' \

curl \
    -X PUT \
    -d '{"key": "<key2>"}' \

curl \
    -X PUT \
    -d '{"key": "<key3>"}' \

         After running all 3 the vault will "unseal" and will give us this JSON response,                     verifying we have indeed unsealed the vault:

[root@<vaultserver> ~]# 2017/04/28 11:31:08.252492 [INFO ] core: acquired lock, enabling active operation
2017/04/28 11:31:08.302087 [INFO ] core: post-unseal setup starting
2017/04/28 11:31:08.302459 [INFO ] core: loaded wrapping token key
2017/04/28 11:31:08.303398 [INFO ] core: successfully mounted backend: type=generic path=secret/
2017/04/28 11:31:08.303521 [INFO ] core: successfully mounted backend: type=system path=sys/
2017/04/28 11:31:08.303542 [INFO ] core: successfully mounted backend: type=cubbyhole path=cubbyhole/
2017/04/28 11:31:08.303611 [INFO ] rollback: starting rollback manager
2017/04/28 11:31:08.305776 [INFO ] expiration: restoring leases
2017/04/28 11:31:08.307007 [INFO ] core: post-unseal setup complete
2017/04/28 11:31:08.307025 [INFO ] core/startClusterListener: starting listener: listener_address=<vaultserver>:8201
2017/04/28 11:31:08.307417 [INFO ] core/startClusterListener: serving cluster requests: cluster_listen_address=<vaultserver>:8201

12.   Now we need to authenticate Streamsets so it can be used with Vault. First we get               the user id for Streamsets by running: /usr/bin/streamsets show-vault-id which         will give us our <vaultid> for Streamsets.
      13.   To authenticate we enable the App ID authentication type using the root token as such:

curl -X POST -H "X-Vault-Token:<roottoken>" -d '{"type":"app-id"}' http://<vaultserver>:8200/v1/sys/auth/app-id

       The log data stream should report:  

core: enabled credential backend: path=app-id/ type=app-id

     Full disclosure here...the App ID authentication type is deprecated and is being                    replaced by the AppRole authentication type. As of writing this, Streamsets does not            yet support AppRole so we have go to ahead with App ID.
14. Now we need to create a policy that this role can access. This will be the policy used             to read "secrets" out of Vault.

curl \
    -X PUT \
   -H "X-Vault-Token:<roottoken>" \
    -H "Content-Type: application/json" \
    -d '{"rules": "{\"path\":{\"secret/*\": {\"policy\": \"read\"}}"}' \

    We can verify the policy took place by running:

curl \
  -X GET \
  -H "X-Vault-Token:<roottoken>" \

      15. Now we need to create the App ID for Streamsets to use. Try to use a randomly                     generated UUID like:


      This produces the value <appid> which we will use as our App ID                                     16.  Now we will create our App ID authentication and associate it with the App ID, we                 just generated, and the secret policy

curl -X POST -H "X-Vault-Token:<roottoken>" -d '{"value":"secret-policy", "display":"streamsets"}' \


17.  Next we will associate this App ID with the Vault ID we accessed
     in streamsets:

curl -X POST -H "X-Vault-Token:<roottoken>" -d '{"value":"<vaultid>"}' http://<vaultserver>:8200/v1/auth/app-id/map/user-id/<appid>

18.  Let’s verify everything was entered properly

curl -X GET -H "X-Vault-Token:<roottoken>" http://<vaultserver>:8200/v1/auth/app-id/map/app-id/<vaultid>

curl -X GET -H "X-Vault-Token:<roottoken>" http://<vaultserver>:8200/v1/auth/app-id/map/user-id/<appid>

     19.  With all of this behind us now, we can finally enter our first set of secrets. Let's store           the username "user1" and password "pass1" for a source system we will call "source":                                                                     
    curl \
     -H "X-Vault-Token: <roottoken>" \
     -H "Content-Type: application/json" \
      -X POST \
     -d '{"value":"user1"}' \

    curl \
      -H "X-Vault-Token: <roottoken>" \
      -H "Content-Type: application/json" \
      -X POST \
      -d '{"value":"pass1"}' \

   20.  You can verify the "secrets" took by running something like this:
   curl -X GET -H "X-Vault-Token:<roottoken>"  http://<vaultserver>:8200/v1/secret/source/username

        Which should return something like this:


   21. Now we need to configure Streamsets. Go into Cloudera Manager, click on the                   Streamsets  service on the main page. In the streamsets screen click on configuration       on the top of the screen. In the  search box type * to filter for Data Collector             Advanced Configuration Snippet (Safety Valve) for Type the following in     the text box:


Figure 1. Cloudera Manager Configuration for Streamsets

   Click Save Changes on the bottom of the page.

22. Go back to the main page on Cloudera Manager. Restart the Streamsets Service to load       the config changes and we're done!

      Now to use Vault, here are a couple of examples:


    When attempting to access the source username and password from a Streamsets                 pipeline we simply use the following notation for credentials to access the secrets:

Figure 2. Streamsets Data Collector JDBC Origin Credentials Tab

            Here is an example of using Vault secrets for accessing a JDBC origin in Data Collector.


  usercontent=$(curl -s  -X GET -H "X-Vault-Token:<roottoken>"  http://<vaultserver>:8200/v1/secret/source/username) \
     username=$( echo jq -r  '.data.value' <<< "${usercontent}" ) \
      pwdcontent=$(curl -s  -X GET -H "X-Vault-Token:<roottoken>"  http://<vaultserver>:8200/v1/secret/source/password) \
     password=$( echo jq -r  '.data.value' <<< "${pwdcontent}" ) \
     sqoop import -    <user>_<table> -D-Xmx3500m --connect jdbc:oracle:thin://<sourceserver>:1521/<servicename> --username ${username} --password ${password} -m 1 --table <user>.<table> --as-avrodatafile --target-dir /staging/source/<table> --delete-target-dir
        This bash script will connect to our data source, using usernames and passwords stored in     Vault via sqoop, and store the data using the avro format in hdfs.

   Java (w/ Jackson and Jersey)


import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;

public class VaultTest {
              Secret secret;
              public static class Data {
                     public String value;

              public static class Secret {

                     public String request_id;
                     public String lease_id;
                     public boolean renewable;
                     public int lease_duration;
                     public Data data;
                     public String wrap_info;
                     public String warnings;
                     public String auth;
     public void executeTest() throws JsonParseException, JsonMappingException,      ClientHandlerException,UniformInterfaceException, IOException {

              Client client = Client.create();

              WebResource webResource =                                                                    client.resource("http://<vaultserver>:8200/v1/secret/source/username");
              ClientResponse response = webResource.accept("application/json")
                           .header("X-Vault-Token", "<roottoken>").get(ClientResponse.class);
              if (response.getStatus() != 500) {

                     if (response.getStatus() != 200) {
                           throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());

                     ObjectMapper mapper = new ObjectMapper();
                     secret = mapper.readValue(response.getEntity(String.class), Secret.class);


    [root@<vaultserver> ~]# java -jar /var/tmp/vaulttest/VaultTest.jar

      In this example we're using a Java application with Jersey and Jackson to pull the secret from Vault's REST API, deserialze it, and print it out to the CLI.