Dealing with truncated data


#1

Hi

I have a job running on a daily basis that processes log files that have been split into json files no more than 50MB, compressed and then ‘baldrised’ (http://oobaloo.co.uk/baldr-a-record-oriented-file-format-lib-in-clojure). These files are then decompressed, joined and opened into an RDD using Sparkling.

When everything is working correctly, I can expect to extract 5 billion rows of data, but recently, it seems as though some of the files have been cut off, possibly due to corruption. Unfortunately this causes the entire job to fail as due to the nature of the json structure, it will find the first half of the key values, but not the last key value.

Here is a simplified version of the data:

{:result
	 {:offers
	  [{:flight 0,
	    :totalprice 557.8,
	    :totalpriceexcltax 257,
	    :ticket 1,
	    :score 1276.4}
	   {:flight 1,
	    :totalprice 557.8,
	    :totalpriceexcltax 257,
	    :ticket 1,
	    :score 1318.9}
	   {:flight 2,
	    :totalprice 557.8,
	    :totalpriceexcltax 257,
	    :ticket 1,
	    :score 1243.9}
	  ],
  
  :suppliers
  [{:supplier "KLM",:orig 0,:dest 1,:offers [0 1 2 3  4 5], :group 1}
   {:supplier "AirFrance", :orig 0, :dest 1, :offers [48 49 50 51 52 53 54 55], :group 1}],
  :flights
  [{:segments [0 1]}
   {:segments [0 2]}
   {:segments [0 3]}
 ],
  :segments
  [{:legs [0 1 2], :duration 1000}
   {:legs [3 4 5], :duration 995}
   {:legs [6 7 8], :duration 1080}
   {:legs [6 7 5], :duration 930}],
  :legs
  [{:stops 0,
    :airline 0,
    :stopovers nil,
    :depart "201503191225",
    :duration 120,
    :orig 0,
    :flightno 1654,
    :dest 2,
    :arrive "201503191425"}
   {:stops 0,
    :airline 0,
    :stopovers nil,
    :depart "201503191640",
    :duration 580,
    :orig 2,
    :flightno 621,
    :dest 3,
    :arrive "201503192120"}
   ],
  :airports
  ["VCE"
   "CLT"
   "AMS"
   "ATL"
   "DTW"
   ]}

In order to pull out all of the data, I first need to start with offer index 0, which gives me the flight index and the supplier. The supplier data also contains the index for the origin and destination. Using the flight index I can get the segments, with give me the leg indexes and the airline indexes.

As I need to loop through all the offers, I perform a count on offers and use this as a range in a For, so the main function that pulls all the data out looks like this:

(defn all-flight-records [flight-search]
  (for [offer-index (range 0 (count (-> flight-search :result :offers)))]
    (let [flight-index (get-in flight-search [:result :offers offer-index :flight])]
      {:outboundorigin		  (outbound-origin flight-search flight-index)
         :outbounddestination (outbound-destination flight-search flight-index)
         :outbounddeparture		(outbound-dept-time flight-search flight-index)
         :outboundarrival 		(outbound-arrival-time flight-search flight-index)
         :inboundorigin 		  (if (> (segment-count flight-search flight-index) 1)
                               (inbound-origin flight-search flight-index))
         :inbounddestination 	(if (> (segment-count flight-search flight-index) 1)
                                (inbound-destination flight-search flight-index))
         :inbounddeparture 		(if (> (segment-count flight-search flight-index) 1)
                                (inbound-dept-time flight-search flight-index))
         :inboundarrival 		  (if (> (segment-count flight-search flight-index) 1)
                                (inbound-arrival-time flight-search flight-index))
         :inboundduration  		(if (> (segment-count flight-search flight-index) 1)
                                 (inbound-duration flight-search flight-index))
         :outboundduration 		(outbound-duration flight-search flight-index)
         :outboundstops 		  (outbound-stops flight-search flight-index)
         :inboundstops 			  (if (> (segment-count flight-search flight-index) 1)
                                (inbound-stops flight-search flight-index))
         :searchid 				    (get-in flight-search [:result :searchid])
         :supplier 				    (supplier flight-search offer-index)
         :inbound-airlines		(if (> (segment-count flight-search flight-index) 1)
                                (inbound-airlines flight-search flight-index))
         :outbound-airlines		(outbound-airlines flight-search flight-index)
         :totalprice			    (->> (get-in flight-search [:result :offers offer-index])
                         :totalprice)
         :totalduration (total-duration flight-search flight-index) 
         :tickettype          (if (nil? (get-in flight-search [:request :segments 1 :depart]))
                                "OW" "RT")
        })))

I’m guessing that truncated data is a common issue, and was wondering if there was a way to for the job to ignore any rows where it find the indexed position, and to carry on, rather than failing and throwing away all of the data it could process?

Any help on this would be appreciated.

Thanks

Ben


#2

Hi @draven72,

Thanks for the question. I’ve dealt with big data in JSON before, and sometimes the data would get truncated. I’m assuming it’s corrupting the entire JSON parse, so it’s impossible to proceed. The simplest thing to avoid the problems with truncation is to have a json map per line. That is, instead of

[ obj1, obj2, obj3 ]

You do

obj1
obj2
obj3

And make sure there are no line breaks in the objects. Then it’s just a matter of splitting on newlines. If any of them don’t parse as JSON, you’ll know it’s a truncation issue but most have still gone through. You just have to throw away the ones that don’t parse. (And make a note of them, if you like).

Now you’ve basically got some of the data you need. You’ll want your for expression to filter out what it can’t find. I don’t fully understand your code, but I think I would approach it like this:

(defn all-flight-records [flight-search]
  (for [offer-index (range 0 (count (-> flight-search :result :offers)))
        :let [flight-index (get-in flight-search [:result :offers offer-index :flight])]
        :when flight-index]
      ...

for and doseq allow two special forms, :let and :when. :let makes a let binding, as you’re used to with the let expression. :when will filter out elements that don’t meet the requirements. They’re just skipped. For instance, to get only even numbers:

(for [x (range 100)
      :when (even? x)]
  x)

I hope that helps. Let me know if it doesn’t.

Eric


#3

Thanks Eric, I’ll give that a go.

Ben


#4

Something you may find useful is the book “Clojure Data Analysis Cookbook” by Eric Rochester and published by Packt. It may give you some ideas on alternative ways to parse your data - while this won’t solve the truncation problem, it may provide ideas on how to handle such issues. May also show more efficient processing which reduces the number of times you need to iterate over the input structure.


#5

Thanks @theophilusx, I’ll add this to my library