lead and lag in spark | window function in pyspark | Lec-16
In this video I have talked about window function in pyspark.Also I have talked about difference between lead and lag. If you want to optimize your process in Spark then you should have a solid understanding of this concept.
Directly connect with me on:- topmate.io/manish_kumar25
Data:-
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]
from pyspark.sql.functions import *
emp_data = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]
emp_schema=['id','name','salary','dept','gender']
emp_df = spark.createDataFrame(data=emp_data,schema=emp_schema)
emp_df = emp_df.select('id','name','salary','gender','dept')
emp_df.show()
Flight Data link:- github.com/databricks/Spark-T...
For more queries reach out to me on my below social media handle.
Follow me on LinkedIn:- / manish-kumar-373b86176
Follow Me On Instagram:- / competitive_gyan1
Follow me on Facebook:- / manish12340
My Second Channel -- / @competitivegyan1
Interview series Playlist:- • Interview Questions an...
My Gear:-
Rode Mic:-- amzn.to/3RekC7a
Boya M1 Mic-- amzn.to/3uW0nnn
Wireless Mic:-- amzn.to/3TqLRhE
Tripod1 -- amzn.to/4avjyF4
Tripod2:-- amzn.to/46Y3QPu
camera1:-- amzn.to/3GIQlsE
camera2:-- amzn.to/46X190P
Pentab (Medium size):-- amzn.to/3RgMszQ (Recommended)
Pentab (Small size):-- amzn.to/3RpmIS0
Mobile:-- amzn.to/47Y8oa4 ( Aapko ye bilkul nahi lena hai)
Laptop -- amzn.to/3Ns5Okj
Mouse+keyboard combo -- amzn.to/3Ro6GYl
21 inch Monitor-- amzn.to/3TvCE7E
27 inch Monitor-- amzn.to/47QzXlA
iPad Pencil:-- amzn.to/4aiJxiG
iPad 9th Generation:-- amzn.to/470I11X
Boom Arm/Swing Arm:-- amzn.to/48eH2we
My PC Components:-
intel i7 Processor:-- amzn.to/47Svdfe
G.Skill RAM:-- amzn.to/47VFffI
Samsung SSD:-- amzn.to/3uVSE8W
WD blue HDD:-- amzn.to/47Y91QY
RTX 3060Ti Graphic card:- amzn.to/3tdLDjn
Gigabyte Motherboard:-- amzn.to/3RFUTGl
O11 Dynamic Cabinet:-- amzn.to/4avkgSK
Liquid cooler:-- amzn.to/472S8mS
Antec Prizm FAN:-- amzn.to/48ey4Pj
Пікірлер: 123
bro, don't get demotivated. U r doing a great job. There are only a few channels on YT that actually do teach you. You are one of them.
Hi Manish bhai, the way you explained Spark is just AMAZING! Please come up with a series on Apache KAFKA and AIRFLOW and their usage in Data Engineering, there is not much good tutorial present online on these 2 topics. It's a request Manish bhai! I am a working professional too, working on Spark and Python.
thnks brother for the course. You are doing a great job
Just finished the playlist.. Amazing explaination.. Aur videos bhi daalo ASAP.
Thanks for making this playlist. I have got a good knowledge on spark now. Hats off to your dedication.🙌
Thank you sir for data sharing.
Thanks for great and in detail content manish bhai great work
Thanks for makin amazing lecture !
thank you for your guidence manish
Thank you for posting videos in Hindi !!! It's always easy to learn in your mother tounge
sir this is really a great video.
Nice ....
Manish Bhai you are doing great work. Don't get demotivated maybe you will not get response immediately becoz some people watch this after few months. So please keep going and Thank you for the such a precious content.
Very helpful
Manish Bhai, You are the best teacher !!!!
Ur GEM of a person Manish bhai ❤
i can understand hindi but it will be really helpful if you make vedios in english so that we can understand the concept indepth, i learned so many things from your content. just requesting you to make it in english if possible or any seperate series for this playlist in english.
Chalo spark ki 2-4 theory book likh do🤣🤣🤣. Actually, you mentioned a good point, jab tak practice nahi karoge tab tak kuch samaj nai ayega, aur samj nai ayega to confidence nahi ayega, aur confidence nahi to selection nahi. Great. I remembered, David Robins Navy seal, world redcord holder, The toughest man in the world, his statement like this, '' If you not getting it first time, try second time, and not getting try again, try again try again, and that one point come and your mind says you hey wait i figured it out, i got it now, ''🤗🤗🤗. Whatever the situation is you've keep going again and again again till you mastered it. I also started preparing from your videos and finished just by watching, now for second watch i'm practicing it and I'm getting it. Thanks Manish for your Hard efforts.
For Q1: window3 = Window.partitionBy("product_name") last_6_month_df = product_df.withColumn("last_6_month_tot_sale", sum("sales").over(window3))\ .withColumn("per_sales_each_mon", round(( col("sales") / col("last_6_month_tot_sale")) * 100, 2)) last_6_month_df.show() Note: But it will work for the given product_data given in description as there for each product 6 months' records are there but if less or greater than we need to use rowsBetween which you taught in later videos.. Thanks..
great Manish
Sir don't get demotivated or angry. You are a great teacher and all the lectures are easy to understand.❤❤❤ Please take care of your health.
Thanks for all the knowledge Manish. Best series i have come across on pyspark. Great wishes for your efforts. Answer for 1st question + finding best sales month for each product window=Window.partitionBy("id") product_df.withColumn("last6mothtotalsales",sum(col("sales_val")).over(window))\ .withColumn("perLast6monthsales",round((col("sales_val")/col("last6mothtotalsales"))*100,2))\ .withColumn("best%SalesMonth",row_number().over(window.orderBy(col("perLast6monthsales").desc())))\ .filter(col("best%SalesMonth")==1).show()
Hi Manish, your content are amazing, how many more videos will come in spark series?
Q1. Answer: window = Window.partitionBy("product_id") last_six_month_df = product_df.withColumn("previous_six_month_total_sales", sum("sales").over(window))\ .withColumn("perc_sales_each_month", round((col("sales")/col("previous_six_month_total_sales")) * 100, 2) ) last_six_month_df.show()
same question pucha gaya hai mere se. and i was able to write.
Thanks Manish for making the playList It would be more helpful if you teach in english As some of them are non hindi listeners like me. Here Is the answer: product_window = Window.partitionBy("product_name") product_df.withColumn("Total_sales", sum(col("sales")).over(product_window))\ .withColumn("Percent_sales",(col("sales")/col("Total_sales")))\ .show()
Bhaia kitab likhne wala joke acha tha 🤣, main full practical kr rha hun thank you content ke liye
1 st question: using data frame API we cant get total sales using sum as when we use sum() in windows function we get cummulative/running total sales so for this we need to used group by and join like we did in Window function class this one i tried during window function class and got this question here
@ManishKumar I think to calculate per_loss_gain, incorrect formula is being used in video at 12:10. It should be divided by previous_month_sales not by sales. according to me correct formula: Percentage Change=(Current Sales − Previous Month Sales )/Previous Month Sales)×100 Correct me if I am wrong. Eager to learn more from you 😊
can you please suggest the topic i should prepare apart from what you shared till now.i have gone through all of videos.I have ain interview tomorrow :)
@manish_kumar_1
Жыл бұрын
Sorry I saw your comment today. Hope your interview went well!
@ashishkulshretha8332
Жыл бұрын
@@manish_kumar_1 Yes. I went through your other recordings.I haveCCA175 certification(hadoop and cpark developer) so i am clear with programming basics.But your videos prepared for the topics beyond programming. But still a roadmap on the topics we should learn for interview that will definitely help as you are still in the process of making the videos.I expect next round of interview will happen within a week.If you can cover spark streaming, kafka related videos than it will help people .looks like lot of requirements are related to real time data processing.
can anyone tell how to reset password of databrick , i did several time but im unable to resolve it. still not able to lgoin
Sir, Can you plz upload the product table data in the description you have given the employee data.
@manish_kumar_1
11 ай бұрын
check now
@pranavlalwadia7359
11 ай бұрын
Thanks
window= Window.partitionBy("product_id").orderBy(desc("sales_date")) last_month_df=product_df.withColumn("privs_month_sales", lag(col("sales"),1) .over(window)).show() is this okay for better understanding of lag()function i have used desc() to better understand priv.. months
Next videos lecture
DOUBT:: sir This data is month wise like each month only 1 record, what if we have every day 1 record then we should use month of every record then order by month OR take only the min(date) over (partition by month(date)) then do like this
Bhai Kafka ka bhi series start ke do per day one video atleast please
@shubne
Жыл бұрын
Yes i'm with you. Manish please make videos on Kafka as well.
bhai yeh vaala data toh iss description me daal dete , ab yeh daa next me dala hua hai tumne kafi dudne k bad pta chlaa
@manish_kumar_1
11 ай бұрын
Is it? Let me check
done brother but u didnt teach lead lag and window functions in spark sql
@manish_kumar_1
6 күн бұрын
Aisa kya mujhe yaad nhi hai ab kis lecture me kya hai. But hona chahiye warna thumbnail aise to nhi banata
yo , great explanation. Thank you for your vids. Please don't get offended if not many people reply. I'm watching this after 3 months, and here's the answer - perc_sales_6m = product_df.withColumn("total_sales_6m",sum(col("sales")).over(window_product))\ .withColumn("perc_sales_6m",round(100*(col("sales")/col("total_sales_6m")),2))\ .show() #Thank you for the Amazing learning content !
It will be great if you make the videos in english
@manish_kumar_1
5 ай бұрын
Will upload in English also but that will take time
Answer for Q1- window = Window.partitionBy(col('product_id')) product_df.withColumn('total_sales',sum(col('sales')).over(window))\ .withColumn('percentage',round(col('sales')*100/col('total_sales'),2))\ .show()
Ques 1 solution: > windowspec= Window.partitionBy(col("product_id")) #setting window size based on product_id > product_df.withColumn("product_wise_total_sales", sum(col("sales")).over(windowspec))\ .withColumn("%age_sale", round((col("sales")/col("product_wise_total_sales"))*100.00,2))\ .show()
Ek session UDF par please
@manish_kumar_1
Жыл бұрын
Bilkul
1st question solution- window = Window.partitionBy("product_id") product_df.withColumn("sum_six_month_sales", sum("sales").over(window))\ .withColumn("sales_per_6_month", round( col("sales") / col("sum_six_month_sales") *100,2))\ .show()
Please check my solution for the question- window= Window.partitionBy("product_id") product_df= product_df.withColumn("total", sum("sales").over(window)) percentage_df= product_df.withColumn("percentage_of_total_sales", round(col("sales")/col("total")*100,2)) percentage_df.show()
Manish sir, your teaching is fantastic, but in above example : ######## write a code for, Percentage of sale on the basis of last month for last 6 months. ((current / prev) -1) * 100 Win=Window.partitionBy("product").orderBy("date") df.withColumn("previous_month_sale", lag(col("sale")).over(win)) \ .withColumn("%_of_sale",((col("sal ")/col("previous_month_sale "))-1)*100)\ .show()
Can you pls explain English language
window = Window.partitionBy("product_id").orderBy("sales_date") # calculate the last month sales using lag last_six_month_sale = product_df.withColumn("last_six_month_sale",sum(col('Sales')).over(window.rowsBetween(0,5))) result = last_six_month_sale.withColumn("sales_percentage",(col("sales") - lag("last_six_month_sale").over(window)/lag("last_six_month_sale").over(window))) result.show(truncate = False)
Hi Sir, Thanks alot for the videos. from pyspark.sql.window import Window window=Window.partitionBy("brand") product_df1=product_df.withColumn("6_month_sales",sum(col("sales")).over(window)) product_df1.withColumn("percentage_of_sales",(col("sales")/col("6_month_sales"))*100).show()
next_month_df.withColumn("per_loss_gain",round(sum(col("sales")+col("next_month_sales"))/col("sales"))*100,2).show(truncate=False) please correct if i wrong mr manish
window = Window.partitionBy('product_name') product_df = product_df.withColumn('total',sum('sales').over(window)) percentage_df = product_df.withColumn("percentageSales",round(( product_df.sales/product_df.total)*100,2) ) product_df.show()
from pyspark.sql.functions import lag,lead,col,round window = Window.partitionBy("id").orderBy("id") df = df.withColumn("6_month_sales",sum("sales").over(window)) df = df.withColumn("comparison",round((col("sales")/col("6_month_sales"))*100,2)) df.show()
Solution: For Ques 1 product_window = Window.partitionBy('Product_name').orderBy('Sale_date') product_df.withColumn('total_sale', sum(col('sales')).over(product_window))\ .withColumn('percentage',round((col('total_sale')-col('sales'))*100/col('total_sale'),2))\ .show() For Ques 2 product_window = Window.partitionBy('Product_name').orderBy('Sale_date') product_df.withColumn('Previous_month_sale', lag(col('sales'),1).over(product_window))\ .withColumn('Gain/Loss',round((col('sales')-col('Previous_month_sale'))*100/col('sales'),2))\ .show()
w = Window.partitionBy("Product_ID") Total_df = df.withColumn("Total", sum(col("Sales")).over(w)) Total_df.withColumn("Percen_Sales", round((col("Sales")*100)/col("Total"),2) ).show()
Sol for_problem1 window = Window.partitionBy(col("product_id")) product_df.withColumn("result",(col('sales_amount'))/(sum(col('sales_amount')).over(window))*100).show()
my answer:- window= Window.partitionBy(col("product_id")).orderBy("sales_date") prev_sales_df= product_df.withColumn("prev_month_sales", lag(col("sales_amount"),1,0).over(window)) \ .withColumn("prof_loss_amount", col("sales_amount")-col("prev_month_sales")) \ .withColumn("per_loss_gain", round((col("prof_loss_amount")/col("prev_month_sales"))*100,2))
@manish_kumar_1
Жыл бұрын
1st wale questions ka answers dijiye
@ashutoshkumarsingh3337
Жыл бұрын
@@manish_kumar_1 window= Window.partitionBy(col("product_id")) total_sales_df= product_df.withColumn("total_sales_month", sum(col("sales_amount")).over(window)) otal_sales_df.withColumn("percentage_sales", round((col("sales_amount")/col("total_sales_month"))*100,2)).show()
window = Window.partitionBy('product_name') new_product_df = product_df.withColumn('Total_Sales', sum(col('sales')).over(window))\ .withColumn('percentage_sale', (col('sales')/col('Total_Sales'))*100).show()
window = Window.partitionBy("product_id") product_df.withColumn("six_mon_total" , sum("sales").over(window) )\ .withColumn("percentage_each_month", round( (col("sales")/col("six_mon_total")) * 100, 2) ).show()
window1=Window.partitionBy('brand') p_df2=p_df1.withColumn('total sales',sum(col('sales')).over(window1)) p_df2.withColumn('% sales',round((col('sales')/col('total sales'))*100,2)).show()
from pyspark.sql.types import * from pyspark.sql.functions import * from pyspark.sql.window import Window window = Window.partitionBy("product_id").orderBy("sales_date").rangeBetween(0,-5) last_six_mnth_sales_df = product_df.withColumn("last_six_months_total_sales", sum(sales).over(window) perc_df= last_six_mnth_sales_df.withColumn("perc_df", ROUND(col(sales)/col(last_six_months_total_sales)*100,2).show()
# % of sale each month based on last 6 month sale window = Window.partitionBy('product_name') df_sales.withColumn('total_sale',sum(col('sales')).over(window))\ .withColumn('percentage_sale_each_month',round((col('sales')) / (col('total_sale')),2)*100).display()
Answer to first question : window=Window.partitionBy('product_id').orderBy('sales_date') last_six_month_sale_df=product_df.withColumn('total_sales',lag(sum('sales').over(window),0).over(window))\ .withColumn('%_last_six_month',round((col('sales')/col('total_sales'))*100,2)) last_six_month_sale_df.show()
window = Window.partitionBy('phone_cat') df.withColumn('6_Months_Sales',sum('price').over(window))\ .withColumn('Month_Sales_Percentage',round(100*(col('price')/col('6_Months_Sales')),2))\ .withColumn('Month_Sales_Percentage',concat('Month_Sales_Percentage',lit(' %')))\ .select('id','phone_cat','date','price','Month_Sales_Percentage').show()
window=Window.partitionBy(col("product_id")).orderBy(col("sales_date")) total_sale_df=product_df.withColumn("total_sale",sum(col("sales")).over(window)) total_sale_df.show() total_sale_df.withColumn("percentage_sale",round((col("sales")/col("total_sale"))*100,2)).show()
val win=Window.partitionBy("product_id) val p_df=p.withColumn("total_sales",sum(col("sales")).over(win))/ withColumn("sales_percentage",round((col("sales")/col("total_sales"))*100,2).show()
from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql.window import Window window= Window.partitionBy("id").orderBy("price").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing) df12=prd_new1_df.withColumn("row_num",sum("price").over(window))\ .withColumn("last_six",round((col("price")/(col("row_num"))*100))).show()
i think this is correct please reply sir next_month_df.withColumn("per_loss_gain", round( ((col("next_month_sales") - col("sales")) / col("sales")) * 100, 2 )).show(truncate=False)
@spark_DE
7 ай бұрын
💯
window = Window.partitionBy("product_id").orderBy("sales_date") df_new = df.withColumn("last_6_month_sale", sum(col("sales")).over(window)) \ .withColumn("%age_of_sales",round((col("sales")/col("last_6_month_sale"))*100,2))
window = Window.partitionBy('product_id') result_df = ( product_df .withColumn('last_6_month_sale', sum('sales').over(window)) .withColumn('%age_sale_from_last_6_month', round(col('sales') / col('last_6_month_sale') * 100, 2)) ) result_df.show()
from pyspark.sql.window import Window from pyspark.sql.functions import * window = Window.partitionBy('prd_name').orderBy() prd_df.withColumn('total_Sales',sum('sale').over(window)).withColumn('percentage_sale',round(expr('(sale/total_Sales)*100'),2)).display()
product_df.groupBy('product_name').agg(sum('sales').alias('total_sum')).withColumn("percentage",round(col("total_sum")*(1/225000),2)).show()
window = Window.partitionBy("product_name").orderBy('sales_date') last_six_month_df = product_df.withColumn('six_month_sale', sum(col('sales')).over(window)) last_six_month_df.withColumn('each_month_sale', round((col('sales')/col("six_month_sale"))*100,2)).show()
@rpraveenkumar007
Жыл бұрын
let me know, guys if this is correct??
Answer for Q1: sales_df = sales_df.withColumn('sales_date',from_unixtime(unix_timestamp('sales_date', 'dd-MM-yyyy'))) grouped_df = sales_df.groupBy('product_name',year('sales_date').alias('year'),month('sales_date').alias('month')).agg(sum('sales').alias('total_sales_monthly')) sum_window = Window.partitionBy('product_name','year') grouped_df.withColumn('total_sales', sum('total_sales_monthly').over(sum_window))\ .withColumn('percent_month_sales_wrt_total',\ round(100*col('total_sales_monthly')/col('total_sales'),2)).show()
Q1. window = Window.partitionBy("product_name") product_df.withColumn("total sum",sum(col("sales")).over(window))\ .withColumn("%sale per month",round((col("sales")/col("total sum")*100),2))\ .show(truncate = False) Q2. window = Window.partitionBy("product_name").orderBy("sales") product_df.withColumn("next month sale",lead(col("sales")).over(window))\ .withColumn("%sale per month",round(((col("next month sale")-col("sales"))/col("sales")*100),2))\ .show(truncate = False)
wini = Window.partitionBy('product_id') y = df_products.withColumn('total_byproduct', sum('price').over(wini)) y.withColumn('percent_sales_eachMonth', round((col('price')/col('total_byproduct'))*100,2)).show()
winp = Window.partitionBy("product_name") prod_df.withColumn("lst_6_mnth_pct", round((col("sales")/(sum("sales").over(winp))*100), 2)).show()
window1=Window.partitionBy("product") #.orderBy("date") prev_df=sales_df.withColumn("sum_prod",sum("sales").over(window1)) prev_df.withColumn("per_sales",round((col("sum_prod")-col("sales"))/col("sum_prod")*100,2)).show()
window = Window.partitionBy("mobile").orderBy(df["date"].asc()) df1=df.withColumn("rn",row_number().over(window)) df2=df1.filter(df1['rn']
One more way to solve windowspec=Window.partitionBy('product').orderBy('id') df.withColumn('partition by sum',sum('sales').over(windowspec))\ .withColumn('%tage based on last6 months',round((col('sales')/col('partition by sum'))*100,2))\ .show() +---+-------+----------+-------+----------------+---------------------------+ | id|product| dt| sales|partition by sum|%tage based on last6 months| +---+-------+----------+-------+----------------+---------------------------+ | 1| iphone|01-01-2023|1500000| 8400000| 17.86| | 1| iphone|01-02-2023|1300000| 8400000| 15.48| | 1| iphone|01-03-2023|1600000| 8400000| 19.05| | 1| iphone|01-04-2023|1700000| 8400000| 20.24| | 1| iphone|01-05-2023|1200000| 8400000| 14.29| | 1| iphone|01-06-2023|1100000| 8400000| 13.1| | 3|oneplus|01-01-2023|1100000| 6925000| 15.88| | 3|oneplus|01-02-2023|1120000| 6925000| 16.17| | 3|oneplus|01-03-2023|1160000| 6925000| 16.75| | 3|oneplus|01-04-2023|1170000| 6925000| 16.9| | 3|oneplus|01-05-2023|1175000| 6925000| 16.97| | 3|oneplus|01-06-2023|1200000| 6925000| 17.33| | 2|samsung|01-01-2023|1100000| 7180000| 15.32| | 2|samsung|01-02-2023|1120000| 7180000| 15.6| | 2|samsung|01-03-2023|1080000| 7180000| 15.04| | 2|samsung|01-04-2023|1800000| 7180000| 25.07| | 2|samsung|01-05-2023| 980000| 7180000| 13.65| | 2|samsung|01-06-2023|1100000| 7180000| 15.32| +---+-------+----------+-------+----------------+---------------------------+
window=Window.partitionBy("product_id") last_6_month=product_df.withColumn("total_sales",sum(col("sales")).over(window)) last_6_month.show() last_6_month.withColumn("percentage_sales",round(((col("sales"))/col("total_sales"))*100,2)).show()
window = Window.partitionBy('product_id') six_month_sales_df = product_df.withColumn('6 months sales',sum(col('sales')).over(window))\ six_month_sales_df.withColumn('% of sales to 6 months', round((col('sales')/col('6 months sales')) *100,2))\ .show()
window= Window.partitionBy('product_id') last_months_df= ll_df.withColumn('last_6_mon_sales', sum('sales').over(window)) last_months_df.withColumn('percen_sales_6_mon', round((col('sales')/col('last_6_mon_sales'))*100,2)).show()
window= Window.partitionBy('product_id') last_months_df= ll_df.withColumn('last_6_mon_sales', sum('sales').over(window)) last_months_df.withColumn('percen_sales_6_mon', round((col('sales')/col('last_6_mon_sales'))*100,2)).show()