To make the data drift solution work for parquet, Data Collector will first dump the data in an Avro format, then will use a MapReduce executor to kick off a mapreduce job ,via YARN, that will convert the avro file to parquet. It does this asynchronously, thus the pipeline reports it has finished and the mapreduce job goes off to run on the cluster. This is fine and all if it works. But........sometimes it doesn't. There are a myriad of reasons why your mapreduce job could fail, none of which matter at the moment when you realize that you just finished an initial load of some monster from the source system that may have taken hours to days to pull down into hadoop and now you have to run it all over again. Well my friend...put down the gun...we have one more shot to save the patient here.
For this fix, we are going to have to convert the file from avro to parquet "manually".To do this we are going to use the code from this github project. The unfortunate thing is that this project is not being maintained and you will get some dependency issues when attempting to build it in maven. So I had to do some surgery on the project and updated some of the dependencies for avro and hadoop to get it working for CDH 5.11. If you want to build the project yourself, here is the pom:
"1.0" encoding="UTF-8"xml version= <project xmlns="http://maven.apache.
org/POM/4.0.0" xmlns:xsi="http://www.w3.org/ 2001/XMLSchema-instance" xsi:schemaLocation="http:// maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/ maven-4.0.0.xsd">
<url>https://repository. cloudera.com/artifactory/ cloudera-repos/</url>
<!-- this is used for inheritance merges -->
<!-- bind to the packaging phase -->
If you are way too busy to be building java projects you can download the jar from here. So now that we have the jar built, here is a walkthrough of how to use it.
In this example we will use this pipeline that takes data from a single table in a relational database and moves it to an external table in Hive, a.k.a. will create a parquet file in HDFS.
|Figure 1. Streamsets Pipeline|
When the pipeline finishes, and the MapReduce executor kicks off, in mid-flight conversion you get a failure and your HDFS dir has something like this in there:
|Figure 2. Failed Parquet Conversion|
If the conversion was successful the Avro file would be gone(if you made that selection in the MapReduce executor) and you would see a file with a .parquet file extension. Here you see the avro_to_parquet_tmp….. a relic of a failed mapreduce job. From here, we need to look at the Data Collector log for this pipeline to retrieve the YARN application id for the mapreduce job:
|Figure 3. Streamsets Data Collector Log|
In that screen filter on Username sdc and the text should be the application id from the log:
|Figure 5. Hue Job Browser Application Filter|
Click on the log icon to access the logs for this failed job. On the syslog tab you should see something like this:
|Figure 6. Syslog for Application|
This is the JSON representation of the avro schema for the file. We will need to copy this text:
Now that we have our avro schema we need to create an Avro schema file. For this example the table is clarity_dbo.order_results. So we will create a file called order_results.avsc, copy the avro schema into the file, change the file to Unix EOL (can be done in notepad ++ Edit->EOL Conversion->Unix/OSX Format), and upload it to a dir in HDFS like /schemas/clarity_dbo/order_results:
|Figure 7. Avro Schema File in HDFS|
|Figure 8. Avro Schema File Text|
If you cannot find this in the logs, you can generate the avro schema using avro tools. The jar file is available here. Now in the directory you downloaded avro tools to on your local linux file system run this command:
hadoop jar avro-tools-1.8.2.jar getschema /tmp/parquetfix/sdc-4dbc5be5-5809-4faa-9e58-8e9a8b77de84_4c546953-99fd-43f1-a1f4-2eea27b7bbe5 | hadoop fs -put -f - /schemas/clarity_dbo/order_results/order_results.avsc
1. getschema tells avro tools to grab the schema from...
2. The full path with, file name, to the avro file we are going to generate the avro schema from
3. The path in hadoop where we want to put the avro schema file
Now we need to copy the Avro file from /sources/clarity_dbo/order_results to a directory we can use for this repair, in the example we use /tmp/parquetfix (If there are any files already in that directory you will need to delete them before attempting to repair!!!!). Check the box next to the file and Click Actions->Copy:
|Figure 9. HDFS Copy Screen|
The first part of the command calls the jar.
- The first parameter determines how much memory should be allocated to the job. In this example we give it 27GB of memory( which is overkill for this, but I had a lot of memory free on the server at the time). Make sure the job has enough memory to run or it will fail. You can see what the current memory load for the server is in Cloudera Manager-->Hosts
- The second parameter is the link to the avro schema we created in the previous step
- The third parameter is the location of the avro file we want converted
- The fourth parameter is the location where we want to dump the converted parquet file(s). Make sure this directory doesn’t already exist or the script will fail
Kick off the script. When it is done, hopefully, you should see a success message:
|Figure 11. Mapreduce Job Results|
Now we go to /sources/clarity_dbo/order_results/fix to see our parquet files:
|Figure 12. Parquet Files|
Check the checkbox next to every file with the .parquet extension and click Action->Move and enter the parent folder:
|Figure 13. Move Parquet Files|
Now that the parquet files are there you can delete the Avro file, the tmp parquet file and the fix directory. This should leave just the .parquet files. To verify the fix took go to Hive and select some records from the table. If you can see the data now congratulations!! You saved the data!