# Scala Data Pipelines @ Spotify

The Presentation inside:

Slide 0

Scala Data Pipelines @ Spotify Neville Li @sinisa_lyh

Slide 1

Who am I? ‣ Spotify NYC since 2011 ‣ Formerly Yahoo! Search ‣ Music recommendations ‣ Data infrastructure ‣ Scala since 2013

Slide 2

Spotify in numbers • • • • • • • Started in 2006, 58 markets 75M+ active users, 20M+ paying 30M+ songs, 20K new per day 1.5 billion playlists 1 TB logs per day 1200+ node Hadoop cluster 10K+ Hadoop jobs per day

Slide 3

Music recommendation @ Spotify • • • • Discover Weekly Radio Related Artists Discover Page

Slide 4

Recommendation systems

Slide 5

A little teaser PGroupedTable<K,V>::combineValues(CombineFn<K,V> combineFn, CombineFn<K,V> reduceFn) Crunch: CombineFns are used to represent the associative operations… Grouped[K, +V]::reduce[U >: V](fn: (U, U) U) Scalding: reduce with fn which must be associative and commutative… PairRDDFunctions[K, V]::reduceByKey(fn: (V, V) => V) Spark: Merge the values for each key using an associative reduce function…

Slide 6

Monoid! enables map side reduce Actually it’s a semigroup

Slide 7

One more teaser Linear equation in Alternate Least Square (ALS) Matrix factorization xu = (YTY + YT(Cu − I)Y)−1YTCup(u) vectors.map { case (id, v) => (id, v * v) }.map(_._2).reduce(_ + _) // YtY ratings.keyBy(fixedKey).join(outerProducts) // YtCuIY .map { case (_, (r, op)) => (solveKey(r), op * (r.rating * alpha)) }.reduceByKey(_ + _) ratings.keyBy(fixedKey).join(vectors) // YtCupu .map { case (_, (r, v)) => val (Cui, pui) = (r.rating * alpha + 1, if (Cui > 0.0) 1.0 else 0.0) (solveKey(r), v * (Cui * pui)) }.reduceByKey(_ + _) http:/ /www.slideshare.net/MrChrisJohnson/scala-data-pipelines-for-music-recommendations

Slide 8

Success story • • • • • • Mid 2013: 100+ Python Luigi M/R jobs, few tests 10+ new hires since, most fresh grads Few with Java experience, none with Scala Now: 300+ Scalding jobs, 400+ tests More ad-hoc jobs untracked Spark also taking off

Slide 9

First 10 months ……

Slide 10

Activity over time

Slide 11

Guess how many jobs written by yours truly?

Slide 12

Performance vs. Agility https://nicholassterling.wordpress.com/2012/11/16/scala-performance/

Slide 13

Let’s dive into something technical

Slide 14

To join or not to join? val streams: TypedPipe[(String, String)] = _ // (track, user) val tgp: TypedPipe[(String, String)] = _ // (track, genre) streams .join(tgp) .values // (user, genre) .group .mapValueStream(vs => Iterator(vs.toSet)) // reducer-only

Slide 15

Hash join val streams: TypedPipe[(String, String)] = _ // (track, user) val tgp: TypedPipe[(String, String)] = _ // (track, genre) streams .hashJoin(tgp.forceToDisk) // tgp replicated to all mappers .values // (user, genre) .group .mapValueStream(vs => Iterator(vs.toSet)) // reducer-only

Slide 16

CoGroup val streams: TypedPipe[(String, String)] = _ val tgp: TypedPipe[(String, String)] = _ streams .cogroup(tgp) { case (_, users, genres) => users.map((_, genres.toSet)) } // (track, (user, genres)) .values // (user, genres)  .group .reduce(_ ++ _) // map-side reduce! // (track, user) // (track, genre)

Slide 17

CoGroup val streams: TypedPipe[(String, String)] = _ val tgp: TypedPipe[(String, String)] = _ streams .cogroup(tgp) { case (_, users, genres) => users.map((_, genres.toSet)) } // (track, (user, genres)) .values // (user, genres)  .group .sum // SetMonoid[Set[T]] from Algebird * sum[U >: V](implicit sg: Semigroup[U]) // (track, user) // (track, genre)

Slide 18

Key-value file as distributed cache val streams: TypedPipe[(String, String)] = _ // (gid, user) val tgp: SparkeyManager = _ // tgp replicated to all mappers streams .map { case (track, user) => (user, tgp.get(track).split(",").toSet) } .group .sum https:/ /github.com/spotify/sparkey SparkeyManager wraps DistributedCacheFile

Slide 19

Joins and CoGroups • • • • Require shuffle and reduce step Some ops force everything to reducers  e.g. mapGroup, mapValueStream CoGroup more flexible for complex logic Scalding flattens a.join(b).join(c)…  into MultiJoin(a, b, c, …)

Slide 20

Distributed cache • • • • • • • Faster with off-heap binary files Building cache = more wiring Memory mapping may interfere with YARN E.g. 64GB nodes with 48GB for containers (no cgroup) 12 × 2GB containers each with 2GB JVM heap + mmap cache OOM and swap! Keep files small (< 1GB) or fallback to joins…

Slide 21

Analyze your jobs • • • • • Concurrent Driven Visualize job execution Workflow optimization Bottlenecks Data skew

Slide 22

Not enough math?

Slide 23

Recommending tracks • • • • • • User listened to Rammstein - Du Hast Recommend 10 similar tracks 40 dimension feature vectors for tracks Compute cosine similarity between all pairs O(n) lookup per user where n ≈ 30m Try that with 50m users * 10 seed tracks each

Slide 24

ANNOY - cheat by approximation • • • • • Approximate Nearest Neighbor Oh Yeah Random projections and binary tree search Build index on single machine Load in mappers via distribute cache O(log n) lookup https:/ /github.com/spotify/annoy https:/ /github.com/spotify/annoy-java

Slide 25

ANN Benchmark https://github.com/erikbern/ann-benchmarks

Slide 26

Filtering candidates • • • • • • Users don’t like seeing artist/album/tracks they already know But may forget what they listened long ago 50m * thousands of items each Over 5 years of streaming logs Need to update daily Need to purge old items per user

Slide 27

Options • • • • Aggregate all logs daily Aggregate last x days daily CSV of artist/album/track ids Bloom filters

Slide 28

Decayed value with cutoff • • • • • • Compute new user-item score daily Weighted on context, e.g. radio, search, playlist score’ = score + previous * 0.99 half life = log0.990.5 = 69 days Cut off at top 2000 Items that users might remember seeing recently

Slide 29

Bloom filters • • • • • • • Probabilistic data structure Encoding set of items with m bits and k hash functions No false negative Tunable false positive probability Size proportional to capacity & FP probability Let’s build one per user-{artists,albums,tracks} Algebird BloomFilterMonoid: z = all zero bits, + = bitwise OR

Slide 30

Size versus max items & FP prob • • • • User-item distribution is uneven Assuming same setting for all users # items << capacity → wasting space # items > capacity → high FP rate

Slide 31

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead

Slide 32

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item n=1k

Slide 33

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full n=1k n=10k

Slide 34

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full full n=1k n=10k n=100k

Slide 35

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full full n=1k full n=10k n=100k n=1m

Slide 36

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup

Slide 37

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item n=1k

Slide 38

   80% n=10k

Slide 39

   8% n=100k

Slide 40

   0.8% n=1m

Slide 41

   0.08%

Slide 42

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k

Slide 43

   100% n=10k

Slide 44

   70% n=100k

Slide 45

   7% n=1m

Slide 46

   0.7%

Slide 47

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k

Slide 48

   100% full n=10k

Slide 49

   100% n=100k

Slide 50

   60% n=1m

Slide 51

   6%

Slide 52

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k

Slide 53

   100% full n=10k

Slide 54

   100% keep n=100k

Slide 55

   60% under-

Slide 56

   utilized n=1m

Slide 57

   6%

Slide 58

Want more scala.language .experimental?

Slide 59

Track metadata • Label dump → content ingestion Third party track genres, e.g. GraceNote Audio attributes, e.g. tempo, key, time signature Cultural data, e.g. popularity, tags Latent vectors from collaborative filtering • Many sources for album, artist, user metadata too • • • •

Slide 60

Multiple data sources • • • • Big joins Complex dependencies Wide rows with few columns accessed Wasting I/O

Slide 61

Apache Parquet • • • • • Pre-join sources into mega-datasets Store as Parquet columnar storage Column projection Predicate pushdown Avro within Scalding pipelines

Slide 62

Projection • • • • pipe.map(a => (a.getName, a.getAmount)) versus Parquet.project[Account]("name", "amount") Strings → unsafe and error prone No IDE auto-completion → finger injury my_fancy_field_name → .getMyFancyFieldName Hard to migrate existing code

Slide 63

Predicate pipe.filter(a => a.getName == "Neville" && a.getAmount > 100) versus FilterApi.and( FilterApi.eq(FilterApi.binaryColumn("name"), Binary.fromString("Neville")), FilterApi.gt(FilterApi.floatColumn("amount"), 100f.asInstnacesOf[java.lang.Float]))

Slide 64

Macro to the rescue Code → AST → (pattern matching) → (recursion) → (quasi-quotes) → Code Projection[Account](_.getName, _.getAmount) Predicate[Account](x => x.getName == “Neville" && x.getAmount > 100) https:/ /github.com/nevillelyh/parquet-avro-extra http:/ /www.lyh.me/slides/macros.html

Slide 65

What else? ‣ Analytics ‣ Ads targeting, prediction ‣ Metadata quality ‣ Zeppelin ‣ More cool stuff in the works

Slide 66

And we’re hiring

Slide 67

The End Thank You

Slide 68

×

HTML:

Ссылка: