Thursday, May 28, 2015

Loading JSON Files with Nested Arrays from Azure Blob Storage into Hive Tables in HDInsight

     In my previous post I wrote about how to upload JSON files into Azure blob storage. In this post, I'd like to expand upon that and show how to load these files into Hive in Azure HDInsight.

     
     The 2 JSON files I'm going to load are up in my blob storage, acronym/abc.txt and acronym/def.txt:


Figure 1. JSON Files in Azure Blob Storage

 Both of these files came from a public RESTful API:

http://www.nactem.ac.uk/software/acromine/dictionary.py?sf=abc

and

http://www.nactem.ac.uk/software/acromine/dictionary.py?sf=def


The format for these 2 files looks something like this:


{  
   
"a":[  
      
{  
         
"sf":"ABC",
         
"lfs":[  
            
{  
               
"lf":"ATP-binding cassette",
               
"freq":1437,
               
"since":1990,
               
"vars":[  
                  
{  
                     
"lf":"ATP-binding cassette",
                     
"freq":1057,
                     
"since":1990
                  
}
               
]
            
}
         
]
      
}
   
]
}

     First thing I'd like to do is create an external table in Hive, where I'm going to "load" the raw JSON files, so we can play around a little with some of the out of box Hive functions for JSON. In the Hive Query Editor 


DROP TABLE IF EXISTS AcronymRaw;

CREATE EXTERNAL TABLE AcronymRaw(json string)

STORED AS TEXTFILE LOCATION 'wasb://jymbo@jymbo.blob.core.windows.net/acronym/';

If we execute the following we can see the raw JSON:


SET hive.execution.engine=tez;


SELECT * FROM  AcronymRaw;

We will get back the raw JSON from the files. (I have to add the tez switch since I did not configure my cluster to use this engine by default). If I want to dive into the first array of my JSON objects and see the acronyms I can use a lateral view, to flatten out the hierarchy, combined with a  json_tuple function:

SET hive.execution.engine=tez;

SELECT
  b.sf
 FROM AcronymRaw AS a lateral view json_tuple(get_json_object(a.json, '$.a[0]'),'sf') b AS sf

Will give me:

Figure 2. JSON Acronym Results
If we want to step into the first array of the next level in the hierarchy, lfs, we can add another lateral view to the statement:

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT

  b.sf,
  c.lf,
  c.freq,
  c.since
 FROM AcronymRaw as a lateral view json_tuple(get_json_object(a.json, '$.a[0]'), 'sf', 'lfs') b AS sf, lfs

lateral view json_tuple(get_json_object(concat('{"a":', b.lfs, '}'), '$.a[0]'), 'lf', 'freq', 'since') c AS lf, freq, since;


Figure 3. First Nested JSON Array


     Now to get this JSON loaded into an external table, that will match the structure of the JSON, we're going to incorporate a 3rd party java library that can deserialize the JSON for us. This will make querying the data much easier. You can download and build the project using Maven from this github link, or you can be lazy, like me, and get the JARS from here. Using Visual Studio you can upload these files to blob storage so they can be referenced in Hive:


Figure 4. Visual Studio Server Explorer
       
     After adding these files to Blob storage we can reference them in our Hive statements when creating and referencing tables using this library:


ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

DROP TABLE IF EXISTS AcronymNonNormalized;

CREATE EXTERNAL TABLE AcronymNonNormalized (
  a array<
    struct<sf:string,lfs:array<
          struct<lf:string, freq:int, since:int, vars:array<
                struct<lf:string, freq:int, since:int>
                  >
                >
              >
            >
          >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'

STORED AS TEXTFILE LOCATION 'wasb://jymbo@jymbo.blob.core.windows.net/acronym/';

     Now that this structure is in a Hive table, we can query it much easier than in the raw JSON format. In this query we can explode the lfs arrays and project them alongside sf:

ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT
 
    a[0].sf,
    lfs.lf,
    lfs.freq,
    lfs.since
 
FROM AcronymNonNormalized LATERAL VIEW EXPLODE( a[0].lfs) lfstable AS lfs;

Figure 5. Exploded Results
     We can even drill into the second tier array vars using another lateral view and explode:


ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT
 
    a[0].sf,
    lfs.lf as mainlf,
    vars.lf,
    vars.freq,
    vars.since
 
  FROM AcronymNonNormalized LATERAL VIEW EXPLODE( a[0].lfs) lfstable AS lfs
  LATERAL VIEW EXPLODE(lfstable.lfs.vars) vartable AS vars;

Figure 6. Second Nested Array
     
     So now that we've figured out how to traverse the arrays in the table, we could potentially normalize this table and create a schema similar to this:


Figure 7. Normalized Schema


 The first table we need to populate will be AcronymSF:

   ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

DROP TABLE IF EXISTS AcronymSF;

CREATE TABLE AcronymSF
(
     sf string -->pk
);

INSERT OVERWRITE TABLE AcronymSF
SELECT

    a[0].sf

FROM AcronymNonNormalized;

SELECT * FROM AcronymSF;

Figure 8. Acronym SF Contents


     For us to populate the rest of the tables, we will need to produce unique values for the arrays in these hierarchies that will be part of a composite primary key.   To do this we will take advantage of a Hive function called posexplode that will generate an auto integer for each array so we can uniquely identify them. The first table we want to create and populate like this will be the AcronymLFS table:

ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

DROP TABLE IF EXISTS AcronymLFS;

CREATE TABLE AcronymLFS
(
     sf string, -->pk
     lfs int, -->pk
     lf string,
     freq int,
     since int

);

INSERT OVERWRITE TABLE AcronymLFS
SELECT

    a[0].sf,
    lfstable.seq AS lfs,
    lfs.lf,
    lfs.freq,
    lfs.since

FROM AcronymNonNormalized LATERAL VIEW POSEXPLODE( a[0].lfs) lfstable AS seq, lfs;

SELECT * FROM AcronymLFS;

Figure 9. AcronymLFS Contents
     
     To load the AcronymVARS table we need to use the same method we used for AcronymLFS, but one level down on the JSON hierarchy:

ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar;
ADD JAR wasb://jymbo@jymbo.blob.core.windows.net/user/hdp/share/lib/hive/json-serde-1.1.9.3-SNAPSHOT.jar;

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

DROP TABLE IF EXISTS AcronymVARS;

CREATE TABLE AcronymVARS
(
     sf string, -->pk
     lfs int, -->pk
     vars int, -->pk
     lf string,
     freq int,
     since int

);

INSERT OVERWRITE TABLE AcronymVARS
SELECT

    a[0].sf,
    lfstable.seq as lfs,
    vartable.seq as vars,
    vars.lf,
    vars.freq,
    vars.since

FROM AcronymNonNormalized LATERAL VIEW POSEXPLODE( a[0].lfs) lfstable AS seq, lfs;
LATERAL VIEW POSEXPLODE(lfstable.lfs.vars) vartable AS seq, vars;

SELECT * FROM AcronymVARS;

Figure 10. AcronymVARS Content

     So with this we have normalized our JSON. If we execute a standard SQL query in Hive like this:

SET hive.execution.engine=tez;
SET hive.cli.print.header=true;

SELECT
 
     tsf.sf,
     tlfs.lf AS mainlf,
     tvars.lf,
     tvars.freq,
     tvars.since
      
FROM AcronymSF tsf
JOIN AcronymLFS tlfs ON tsf.sf=tlfs.sf
JOIN AcronymVARS tvars ON tlfs.sf=tvars.sf AND tlfs.lfs=tvars.lfs;

We get this:

Figure 11. SQL Join Result
     
     As you can see from these results, they match exactly the results from the query in figure 6. Now for performance purposes you would want to leave all this data in the same table(nested arrays and all). But if you wanted to set this data up to be moved to a relational database with sqoop, going through this technique may be useful.

Tuesday, May 26, 2015

Uploading a JSON Feed into Azure Blob Storage using an ASP.NET Web API

     Been diving into Microsoft's cloud offering and wanted to share a little exercise I went through to get a JSON feed into Azure storage.  This assumes that Azure storage is already set up for your account . The wizards make this pretty easy to execute. 

   For this example I used a public JSON feed from the National Centre for Text Mining in the UK (thus spellchecker yelling at me for "Centre"). They have a RESTful API that you can submit acronyms to, and it will return words that may fit those acronyms in a JSON format that looks something like this:

(Example for ABC)


[  
   
{  
      
"sf":"ABC",
      
"lfs":[  
         
{  
            
"lf":"ATP-binding cassette",
            
"freq":1437,
            
"since":1990,
            
"vars":[  
               
{  
                  
"lf":"ATP-binding cassette",
                  
"freq":1057,
                  
"since":1990
               
}
            
]
         
}
      
]
   
}
]


     To easily and repeatably get this data into Azure Storage, I'm going to create my own RESTful Web API in .net to assist me. To do this I'm going to create a new ASP.NET Web Application in Visual Studio:


Figure 1. ASP.NET Project

     When prompted, choose Web API:


Figure 2. Template Selection Screen

     This will have created our project and we can get started. First this we want to do is get the configurations right so that we can connect to our Azure Storage. In the Azure storage area select which storage container you want the JSON feed to be stored in:


Figure 3. Azure Storage Page
Then click on Manage Access Keys in the menu at the bottom of the screen:


Figure 4. Manage Access Keys

     Here you will need to grab your storage account name and primary access key for our project.  We are going to need these values for our Web.config in our Visual Studio solution. Open up web config and add values in <appSettings>


<appSettings>
    <add key="StorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=jymbo;AccountKey=wWlWqJoG+sZou5m/..." /></appSettings>


The second place we need to edit is <connectionStrings>
<connectionStrings> <add name="StorageConnectionString" connectionString=
 "DefaultEndpointsProtocol=https;AccountName=jymbo;AccountKey=wWlWqJoG+sZou5m/..." /> 
</connectionStrings>

To be able to write to cloud storage we'll need to add some namespaces to our project. For 
this we can use NuGet. Right click on the project and click Manage NuGet Packages:
Figure 5. Manage NuGet Packages 
Do a search for Windows Azure Storage, click install to add to your project.
Figure 6. Windows Azure Storage
We can finally start coding! Now we need to add a controller to our project that can accept inputs to 
our API:

Figure 7. Add Controller 

Choose Web API 2 Controller - Empty:

Figure 8. Controller Type

For this example I named my controller BlobController:

Figure 9. Controller Name 

In BlobController.cs paste this code:


using System;
using System.Net;
using System.Web.Http;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;
namespace JSONBlobStorage.Controllers
{
    public class BlobController : ApiController
    {
        CloudBlockBlob blockBlob;
        string wUrl = "http://www.nactem.ac.uk/software/acromine/dictionary.py?sf=";
        string jsonString;
        // GET: api/Blob/5
        public string Get([FromUri]string acronym)
        {
            ConnectToAzure(acronym);
            LoadJSONTOBlobStorage(acronym);
            return "File containing JSON for acronym "+acronym+" has been uploaded";
        }
        private void ConnectToAzure(string acronym)
        {
            // Retrieve storage account from connection string.
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
            CloudConfigurationManager.GetSetting("StorageConnectionString"));
            // Create the blob client.
            CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
            // Retrieve a reference to a container.
            CloudBlobContainer container = blobClient.GetContainerReference("jymbo");
            // Create the container if it doesn't already exist.
            container.CreateIfNotExists();
            // Retrieve reference to a blob named "Acronym*".
            blockBlob = container.GetBlockBlobReference("acronym/"+acronym+".txt");
        }
        private void LoadJSONTOBlobStorage(string acronym)
        {
            HttpWebRequest httpWReq = (HttpWebRequest)WebRequest.Create(wUrl+acronym);
            HttpWebResponse httpWResp = (HttpWebResponse)httpWReq.GetResponse();
            //Test the connection
            if (httpWResp.StatusCode == HttpStatusCode.OK)
            {
                Stream responseStream = httpWResp.GetResponseStream();
                //Set jsonString using a stream reader
                using (StreamReader reader = new StreamReader(responseStream))
                {
                    jsonString =reader.ReadToEnd();
                }
                //Place top level array in an object a
                jsonString = "{\"a\":" + jsonString.TrimEnd('\r', '\n') + "}";
                //Upload JSON to blob storage
blockBlob.UploadText(jsonString);
            }
        }
    }
}
I did not add any exception handling, as this was only for demonstration purposes. We have a Get 
that will accept an acronym parameter that we want to pass to the acronym API, a ConnectToAzure 
method to will authenticate us to Azure blob storage and a LoadJSONTOBlobStorage method that will 
pull the JSON from the RESTful API and upload it to blob storage. You'll notice that I read the stream 
into a string and concatenate the JSON with { a: .. }. This is to wrap the top level array we receive in
 an object so our JSON will be easier to manage when we deserialize:

{      "a":[         {            "sf":"ABC",          "lfs":[               {                  "lf":"ATP-binding cassette",                "freq":1437,                "since":1990,                "vars":[                     {                        "lf":"ATP-binding cassette",                      "freq":1057,                      "since":1990                   }                ]             }          ]       }    ] }
If I did not need to edit the JSON before storage, I could have called blockBlob.UploadFromStream(responseStream);
 , passed the stream and got the same results. I got the code for the Azure authentication here. This example is 
using C#, but there are a couple other examples using other languages in that post.


     Now we need to publish this project to Azure. Right click the project and choose Publish:

Figure 10. Publish

On the Publish Web screen select Microsoft Azure Web Apps:


Figure 11. Publish Web Screen

This will prompt you to log into your Azure account. When prompted for existing or new, click the New button:


Figure 12. Existing Web App Screen

Then enter a name for your web app and click Create:


Figure 13. Choose App Name Screen


You will then go through a series of screens to validate your entries, here you can just click the Publish 
button to create your site. Your Output window should be giving you a status of what is being pushed up to
 Azure. When completed you should see something like this:

Figure 14. Output Window

Once completed your site should launch in your browser. Now we're ready to load some JSON to Azure Storage.
The API takes a single parameter for the acronym. Once we pass this parameter the Web API will call the RESTful
 web service, return our JSON and upload it to Azure Storage. 

     To test it out we can call http://jsonblobstorageexample.azurewebsites.net/api/Blob?acronym=abc. Which 
should upload a JSON file for acronym data for ABC and produce the following ugly message:

<string xmlns="http://schemas.microsoft.com/2003/10/Serialization/">
File containing JSON for acronym abc has been uploaded
</string>
If we look in our Azure storage account we should now see that file:
Figure 15. Azure Storage
We can use our server explorer in Visual Studio to download the file and verify the contents:

Figure 16. Visual Studio Server Explorer

Hit the Save As Button and open in a local directory:

Figure 17. Download File from Blob Storage
Figure 18. abc.txt Contents
     Keep in mind that whenever you pass abc it will overwrite the existing abc.txt file in blob storage. This 
example could always be expanded on to pass a file name parameter to get around this if desired. If you 
pass parameter xyz for acronym, it will create a blob called acronym/xyz.txt and so on.