למה לפעמים ספארק ”עצלן”, ואיך תפתרו את אתגרי הביצועים בחישובים מבוזרים?

איך משתמשים בספארק, למה הוא לא מפעיל מיד את החישובים על המידע – וכל מה שצריך לדעת על פתרון הקסם שנקרא caching (ולמה לא תמיד משתלם להשתמש בו)

טכנולוגיית עיבוד מבוזר מספקת פתרון לעיבוד כמויות עצומות של מידע (צילום: Dreamstime)

מאת אופק הוד, Data Engineer, אוטונומו

הגידול המואץ בכמויות המידע שאנחנו מייצרים מצריך כלים מתאימים שיאפשרו לנו לשמור, לעבד ולהפיק מסקנות בצורה פשוטה ונגישה ככל הניתן. ספארק (Apache Spark) הוא אחד הפרויקטים המובילים בשוק היום שמאפשרים לענות על צרכים אלו בעזרת טכנולוגיית עיבוד מבוזר.

עיבוד מבוזר, ספארק ומה שביניהם

טכנולוגיית עיבוד מבוזר מספקת פתרון לעיבוד כמויות עצומות של מידע, כאלו ששרת בודד לא יכול להתמודד איתן. הקונספט מתבסס על גיוס אוסף שרתים שיעבדו כיחידה אחת (cluster) על מנת לחלק ביניהם את כמות החישובים הכבדה שיש לבצע. אפשר לחשוב על תהליך שכזה כפירוק של משימה אחת גדולה לאוסף משימות קטנות, שיבוצעו במקביל על הרבה שרתים וכך נוכל בסוף לסיים את המשימה הגדולה בזמן סביר. כל היופי המתוחכם הזה יקרה מאחורי הקלעים – אנחנו פשוט נפנה לדלפק המרכזי ונעביר לו הוראות פשוטות, והוא כבר ידאג לחשוב בשבילנו בגדול.

בספארק מרכיבים “משימות” באמצעות קוד או באמצעות שפת SparkSQL שמזכירה מאוד SQL קלאסי של מאגרי מידע רלציוניים, אך מאחורי הקלעים הן רצות באופן מבוזר. כשמגדירים לספארק מבנה של “משימה”, הוא מייצר אובייקט שמגדיר אותה. כשהיא רצה הוא יודע לפרק אותה לאוסף משימות קטנות (Task), ובסופו של דבר לבצע חלוקה בין הרבה שרתים שיעבדו אותן במקביל. Task זו יחידת העיבוד הקטנה ביותר בספארק, אך ישנן אבסטרקציות נוספות כגון Stage ו-Job שלא נתייחס אליהן כאן.

ישנם שלושה סוגי אובייקטים שמייצגים משימות בספארק: DataFrame ,Dataset ו-RDD. לרוב לא נרצה להשתמש ב-RDD, ונעבוד עם DataFrame או Dataset. הסיבה היא ששני אלו מבוססי סכמה (בדומה לטבלת SQL) ומנצלים את מנוע ה-SparkSQL שלו יתרונות רבים, בהם אוסף אופטימיזציות פנימיות שיגרמו לעיבודים שלנו לרוץ מהר יותר. לצורך העניין, במאמר זה אדגים את הנושא באמצעות DataFrame.

אז איך משתמשים בספארק, למה ואיפה באה לידי ביטוי ה”עצלנות” שלו ומה הם האתגרים המשתמעים מכך?

למה ומתי ספארק מתגלה כ”עצלן”?

תהליך בניית ה-DataFrame בספארק מחולק לשני שלבים עיקריים:

1. קוראים לאוסף טרנספורמציות, כגון בחירת שדות, באמצעות הפונקציה select או סינון רשומות באמצעות הפונקציה filter.

2. מבצעים פעולה כלשהי, כגון ספירת כמות הרשומות עם הפונקציה count או שמירת הרשומות עם הפונקציה write.

לדוגמה, נייצר DataFrame שמביא מידע בפורמט Parquet מ-S3 ומבצע סינון לפי השדה id (קוד פייתון):

df = spark.read.parquet("s3://data/path/input").filter("id = 1")

יצרנו משתנה בשם df המכיל אובייקט מסוג DataFrame, ונראה שבשלב זה ספארק קרא את הרשומות מ-S3, סינן אותן לפי השדה id=1 והחזיר את התוצאה למשתנה df. הרי זה מה שקורה בכל קוד שקורא מקבצים ומבצע סינונים – נשמע הגיוני, לא? ובכן, טעות.

בניית DataFrame בספארק מתבצעת באופן “עצלן” (lazy evaluation). זה אומר שכל טרנספורמציה שנוסיף על גבי DataFrame לא תפעיל את החישובים על המידע, אלא רק תזכור מה צריך לעשות. החישובים יקרו בפועל רק כשנקרא לפעולה כלשהי.

נמשיך: כעת ברצוננו לספור את כמות הרשומות שעבורן id=1:

count = df.count()

הפעם, כמצופה, הפעלנו פעולת count ולכן ספארק באמת ניגש ל- S3, קרא את המידע, ביצע סינון רק עבור הרשומות שמקיימות את התנאי id=1 והחזיר לנו את כמות הרשומות שנמצאו אל תוך המשתנה count.

כעת ספירת הרשומות הללו לא מספיקה לנו, ואנחנו רוצים גם לשמור אותן ב-S3:

df.write.parquet("s3://data/path/output")

מה קרה הפעם? הפעלנו עוד פעולת write, ולכן ספארק קרא את הרשומות מ-S3, סינן אותן לפי השדה id=1 וכתב את התוצאה ל-S3 בנתיב חדש.

נראה טוב? חדי העין יעלו שאלה מתבקשת: ספארק ביצע פעמיים את תהליך קריאת המידע מ-S3 וטרנספורמציית סינון הרשומות: פעם ראשונה לטובת הפעולה count ופעם שנייה לטובת הפעולה write. לא חבל על המאמץ הכפול? האם ספארק לא חכם מספיק כדי לקרוא את המידע ולבצע את הסינון בפעם אחת?

תמיד כשנקרא לפונקציה המוגדרת כפעולה (action), נעביר לספארק מסר של “ביצעתי פעולה ואני רוצה את התוצאה שלה עכשיו”, וכתוצאה מכך נגרום לו מיד לבצע את החישובים. נוסף על כך, חוץ מהתוצאה הסופית של הפעולה שביצענו – כמו המספר הסופי בפעולת count או הקבצים שישמרו ב-S3 בפעולת write – חישובי הטרנספורמציות לאורך ה-Job מוגדרים כנדיפים ולכן אין להם אופציה לגישה חוזרת. זאת הסיבה שספארק לא יכול “לזכור” את התוצאות של הטרנספורמציות ולהשתמש בהן בפעולה הבאה.

אבל מתברר שיש לנו דרך לרמוז לספארק לחרוג ממנהגו ולשמור את התוצאות של הטרנספורמציות בצד, כי ממש בעוד רגע נקרא לפעולה נוספת ופשוט חבל על הזמן של החישובים החוזרים. הדרך הזאת נקראת caching – בעזרת הפונקציה cache או persist על DataFrame, ניתן להגיד לספארק לשמור את התוצאות של הטרנספורמציות של הפעולה הראשונה שרצה במקום נגיש. כך הפעולות הבאות יוכלו לדלג על החישובים החוזרים, ולשלוף את המידע הרלוונטי ישירות מהמקום הנגיש.

אז למה ספארק לא מבצע בשבילנו Caching באופן אוטומטי?

לא תמיד “משתלם” לבצע caching, גם אם יש חזרה על אותו ה-DataFrame. אחת הסיבות לכך היא שהחישובים החוזרים אינם תמיד זהים לחלוטין, מפני שספארק מבצע אופטימיזציות עבור כל פעולה בנפרד. לדוגמה, עבור פעולת ה-count שלנו ספארק יוכל לקרוא מראש רק לשדה אותו אנחנו מסננים וגם “לדחוף” את הפעולה הזאת למערכת הקבצים (projection pushdown). המשמעות היא שבמקום להביא את כל תוכן הקבצים מ-S3 שכולל בתוכו את כל השדות, ספארק יוכל להביא אליו ישירות רק את הערכים של השדה id מהחלקים הרלוונטיים בקבצי ה-Parquet, לבצע את הסינון והספירה, ובכך להקטין משמעותית את כמות המידע שהוא טוען אליו ולחסוך זמן ריצה.

אופטימיזציה כזאת אינה יכולה להתבצע עבור פעולת ה-write, שבשבילה לא מספיק להביא רק את השדה שאותו מסננים, אלא צריך להביא את כל השדות כדי לשמור אותם בסוף ב- S3. במאמר המקיף הזה תוכלו למצוא פירוט עמוק לגבי האופן שבו cache עובד בספארק, מתי כדאי להשתמש בו ומהם האתגרים שנתקלנו בהם בביצוע מיגרציה לתהליכים ב-scale גבוה ומשתנה.

בזכות הדרך שעברנו כעת עם ספארק, ראינו שאמנם ה”עצלנות” שלו מאפשרת לאגד עבורנו אוסף טרנספורמציות ולהריץ אותן יחד ביעילות, אך באותו הזמן היא מחביאה מאיתנו את העובדה ששימוש חוזר באותו ה-DataFrame מאלץ אותנו לבצע חישובים כפולים מאחורי הקלעים, גם אם לא התכוונו לכך. כשעובדים עם כמויות מידע משמעותיות, מדובר בכפילות של חישובים ארוכים, כבדים ויקרים.

ספארק נוהג לחשוב בגדול בשבילנו, ובזכות כך אנחנו כל כך אוהבים אותו, אך זאת לעולם לא תהיה סיבה להוריד את הידיים מהמושכות ולסמוך עליו בעיניים עצומות. אז בפעם הבאה שאנחנו מתכוונים לסמוך על מוצר גדול שיעשה את כל העבודה בשבילנו, תזכרו שאפילו ספארק לעתים בוחר להיות עצלן.

הכתבה בחסות OTONOMO

חברת הסטארטאפ אוטונומו בנתה זירת מסחר ופלטפורמת איסוף מידע מרכבים מקושרים שמשרתת יצרניות רכב, ציי רכב ויותר מ-100 נותני שירותים בתעשיית התחבורה. הפלטפורמה של אוטונומו מעבדת באופן מאובטח יותר מ-4 מיליארד פריטי מידע מדי יום, שנאספים מיותר מ-40 מיליון רכבים מקושרים מסביב לעולם. בימים אלה מגייסת החברה עובדים למרכז הפיתוח בהרצליה.

כתב אורח

אנחנו מארחים מפעם לפעם כותבים טכנולוגים אורחים, המפרסמים כתבות בתחומי התמחות שלהם. במידה ואתם מעוניינים לפרסם פוסט בשמכם, פנו אלינו באמצעות טופס יצירת קשר באתר.

הגב

3 תגובות על "למה לפעמים ספארק ”עצלן”, ואיך תפתרו את אתגרי הביצועים בחישובים מבוזרים?"

avatar
Photo and Image Files
 
 
 
Audio and Video Files
 
 
 
Other File Types
 
 
 

* היי, אנחנו אוהבים תגובות!
תיקונים, תגובות קוטלות וכמובן תגובות מפרגנות - בכיף.
חופש הביטוי הוא ערך עליון, אבל לא נוכל להשלים עם תגובות שכוללות הסתה, הוצאת דיבה, תגובות שכוללות מידע המפר את תנאי השימוש של Geektime, תגובות שחורגות מהטעם הטוב ותגובות שהן בניגוד לדין. תגובות כאלו יימחקו מייד.

סידור לפי:   חדש | ישן | הכי מדורגים
גיא
Guest

הרבה פעמים ה Cache של Spark לא מספיק או לא יעיל וכדאי לשקול להוסיף את Redis.
https://github.com/RedisLabs/spark-redis/

דורי
Guest

שווה להוסיף התייחסות בין טרנפורמציה לינארית ובין טרנפורמציה של כמה מקורות מידע כמו join של 2 דאטהפריים ,נניח ויהיה כישלון באחד התהליכים לאחר ה join כדאי לשמור לcache או דיסק את התוצאה מיד לאחר ה join על מנת לחסוך ריצה מחדש שתבצע שוב פעם join בין ה דאטהפריים

david
Guest

מי משתמש בספארק ?? חחח

wpDiscuz

תגיות לכתבה: