Scala Data Pipelines @ Spotify


The Presentation inside:

Slide 0

Scala Data Pipelines @ Spotify Neville Li @sinisa_lyh


Slide 1

Who am I? ‣ Spotify NYC since 2011 ‣ Formerly Yahoo! Search ‣ Music recommendations ‣ Data infrastructure ‣ Scala since 2013


Slide 2

Spotify in numbers • • • • • • • Started in 2006, 58 markets 75M+ active users, 20M+ paying 30M+ songs, 20K new per day 1.5 billion playlists 1 TB logs per day 1200+ node Hadoop cluster 10K+ Hadoop jobs per day


Slide 3

Music recommendation @ Spotify • • • • Discover Weekly Radio Related Artists Discover Page


Slide 4

Recommendation systems


Slide 5

A little teaser PGroupedTable<K,V>::combineValues(CombineFn<K,V> combineFn, CombineFn<K,V> reduceFn) Crunch: CombineFns are used to represent the associative operations… Grouped[K, +V]::reduce[U >: V](fn: (U, U) U) Scalding: reduce with fn which must be associative and commutative… PairRDDFunctions[K, V]::reduceByKey(fn: (V, V) => V) Spark: Merge the values for each key using an associative reduce function…


Slide 6

Monoid! enables map side reduce Actually it’s a semigroup


Slide 7

One more teaser Linear equation in Alternate Least Square (ALS) Matrix factorization xu = (YTY + YT(Cu − I)Y)−1YTCup(u) vectors.map { case (id, v) => (id, v * v) }.map(_._2).reduce(_ + _) // YtY ratings.keyBy(fixedKey).join(outerProducts) // YtCuIY .map { case (_, (r, op)) => (solveKey(r), op * (r.rating * alpha)) }.reduceByKey(_ + _) ratings.keyBy(fixedKey).join(vectors) // YtCupu .map { case (_, (r, v)) => val (Cui, pui) = (r.rating * alpha + 1, if (Cui > 0.0) 1.0 else 0.0) (solveKey(r), v * (Cui * pui)) }.reduceByKey(_ + _) http:/ /www.slideshare.net/MrChrisJohnson/scala-data-pipelines-for-music-recommendations


Slide 8

Success story • • • • • • Mid 2013: 100+ Python Luigi M/R jobs, few tests 10+ new hires since, most fresh grads Few with Java experience, none with Scala Now: 300+ Scalding jobs, 400+ tests More ad-hoc jobs untracked Spark also taking off


Slide 9

First 10 months ……


Slide 10

Activity over time


Slide 11

Guess how many jobs written by yours truly?


Slide 12

Performance vs. Agility https://nicholassterling.wordpress.com/2012/11/16/scala-performance/


Slide 13

Let’s dive into something technical


Slide 14

To join or not to join? val streams: TypedPipe[(String, String)] = _ // (track, user) val tgp: TypedPipe[(String, String)] = _ // (track, genre) streams .join(tgp) .values // (user, genre) .group .mapValueStream(vs => Iterator(vs.toSet)) // reducer-only


Slide 15

Hash join val streams: TypedPipe[(String, String)] = _ // (track, user) val tgp: TypedPipe[(String, String)] = _ // (track, genre) streams .hashJoin(tgp.forceToDisk) // tgp replicated to all mappers .values // (user, genre) .group .mapValueStream(vs => Iterator(vs.toSet)) // reducer-only


Slide 16

CoGroup val streams: TypedPipe[(String, String)] = _ val tgp: TypedPipe[(String, String)] = _ streams .cogroup(tgp) { case (_, users, genres) => users.map((_, genres.toSet)) } // (track, (user, genres)) .values // (user, genres)
 .group .reduce(_ ++ _) // map-side reduce! // (track, user) // (track, genre)


Slide 17

CoGroup val streams: TypedPipe[(String, String)] = _ val tgp: TypedPipe[(String, String)] = _ streams .cogroup(tgp) { case (_, users, genres) => users.map((_, genres.toSet)) } // (track, (user, genres)) .values // (user, genres)
 .group .sum // SetMonoid[Set[T]] from Algebird * sum[U >: V](implicit sg: Semigroup[U]) // (track, user) // (track, genre)


Slide 18

Key-value file as distributed cache val streams: TypedPipe[(String, String)] = _ // (gid, user) val tgp: SparkeyManager = _ // tgp replicated to all mappers streams .map { case (track, user) => (user, tgp.get(track).split(",").toSet) } .group .sum https:/ /github.com/spotify/sparkey SparkeyManager wraps DistributedCacheFile


Slide 19

Joins and CoGroups • • • • Require shuffle and reduce step Some ops force everything to reducers
 e.g. mapGroup, mapValueStream CoGroup more flexible for complex logic Scalding flattens a.join(b).join(c)…
 into MultiJoin(a, b, c, …)


Slide 20

Distributed cache • • • • • • • Faster with off-heap binary files Building cache = more wiring Memory mapping may interfere with YARN E.g. 64GB nodes with 48GB for containers (no cgroup) 12 × 2GB containers each with 2GB JVM heap + mmap cache OOM and swap! Keep files small (< 1GB) or fallback to joins…


Slide 21

Analyze your jobs • • • • • Concurrent Driven Visualize job execution Workflow optimization Bottlenecks Data skew


Slide 22

Not enough math?


Slide 23

Recommending tracks • • • • • • User listened to Rammstein - Du Hast Recommend 10 similar tracks 40 dimension feature vectors for tracks Compute cosine similarity between all pairs O(n) lookup per user where n ≈ 30m Try that with 50m users * 10 seed tracks each


Slide 24

ANNOY - cheat by approximation • • • • • Approximate Nearest Neighbor Oh Yeah Random projections and binary tree search Build index on single machine Load in mappers via distribute cache O(log n) lookup https:/ /github.com/spotify/annoy https:/ /github.com/spotify/annoy-java


Slide 25

ANN Benchmark https://github.com/erikbern/ann-benchmarks


Slide 26

Filtering candidates • • • • • • Users don’t like seeing artist/album/tracks they already know But may forget what they listened long ago 50m * thousands of items each Over 5 years of streaming logs Need to update daily Need to purge old items per user


Slide 27

Options • • • • Aggregate all logs daily Aggregate last x days daily CSV of artist/album/track ids Bloom filters


Slide 28

Decayed value with cutoff • • • • • • Compute new user-item score daily Weighted on context, e.g. radio, search, playlist score’ = score + previous * 0.99 half life = log0.990.5 = 69 days Cut off at top 2000 Items that users might remember seeing recently


Slide 29

Bloom filters • • • • • • • Probabilistic data structure Encoding set of items with m bits and k hash functions No false negative Tunable false positive probability Size proportional to capacity & FP probability Let’s build one per user-{artists,albums,tracks} Algebird BloomFilterMonoid: z = all zero bits, + = bitwise OR


Slide 30

Size versus max items & FP prob • • • • User-item distribution is uneven Assuming same setting for all users # items << capacity → wasting space # items > capacity → high FP rate


Slide 31

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead


Slide 32

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item n=1k


Slide 33

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full n=1k n=10k


Slide 34

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full full n=1k n=10k n=100k


Slide 35

Scalable Bloom Filter • • • • • Growing sequence of standard BFs Increasing capacity and tighter FP probability Most users have few BFs Power users have many Serialization and lookup overhead item full full n=1k full n=10k n=100k n=1m


Slide 36

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup


Slide 37

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item n=1k


Slide 38

   80% n=10k


Slide 39

   8% n=100k


Slide 40

   0.8% n=1m


Slide 41

   0.08%


Slide 42

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k


Slide 43

   100% n=10k


Slide 44

   70% n=100k


Slide 45

   7% n=1m


Slide 46

   0.7%


Slide 47

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k


Slide 48

   100% full n=10k


Slide 49

   100% n=100k


Slide 50

   60% n=1m


Slide 51

   6%


Slide 52

Opportunistic Bloom Filter • • • • • Building n BFs of increasing capacity in parallel Up to << N max possible items Keep smallest one with capacity > items inserted Expensive to build Cheap to store and lookup item full n=1k


Slide 53

   100% full n=10k


Slide 54

   100% keep n=100k


Slide 55

   60% under-


Slide 56

   utilized n=1m


Slide 57

   6%


Slide 58

Want more scala.language .experimental?


Slide 59

Track metadata • Label dump → content ingestion Third party track genres, e.g. GraceNote Audio attributes, e.g. tempo, key, time signature Cultural data, e.g. popularity, tags Latent vectors from collaborative filtering • Many sources for album, artist, user metadata too • • • •


Slide 60

Multiple data sources • • • • Big joins Complex dependencies Wide rows with few columns accessed Wasting I/O


Slide 61

Apache Parquet • • • • • Pre-join sources into mega-datasets Store as Parquet columnar storage Column projection Predicate pushdown Avro within Scalding pipelines


Slide 62

Projection • • • • pipe.map(a => (a.getName, a.getAmount)) versus Parquet.project[Account]("name", "amount") Strings → unsafe and error prone No IDE auto-completion → finger injury my_fancy_field_name → .getMyFancyFieldName Hard to migrate existing code


Slide 63

Predicate pipe.filter(a => a.getName == "Neville" && a.getAmount > 100) versus FilterApi.and( FilterApi.eq(FilterApi.binaryColumn("name"), Binary.fromString("Neville")), FilterApi.gt(FilterApi.floatColumn("amount"), 100f.asInstnacesOf[java.lang.Float]))


Slide 64

Macro to the rescue Code → AST → (pattern matching) → (recursion) → (quasi-quotes) → Code Projection[Account](_.getName, _.getAmount) Predicate[Account](x => x.getName == “Neville" && x.getAmount > 100) https:/ /github.com/nevillelyh/parquet-avro-extra http:/ /www.lyh.me/slides/macros.html


Slide 65

What else? ‣ Analytics ‣ Ads targeting, prediction ‣ Metadata quality ‣ Zeppelin ‣ More cool stuff in the works


Slide 66

And we’re hiring


Slide 67

The End Thank You


Slide 68


×

HTML:





Ссылка: