pyspark - Why does my Datastax Spark app use unusual amounts of memory? -
my systems memory 32gb. have 3 machines , 1 machine has 4gb 10gb , 14gb decreasing. system has run 44 hours , memory usage high , keeps increasing, lead crash.
this spark executor picture:
this pyspark code uses more , more memory longer runs, don't want because system crash. suggestions on how change this?:
begin = datetime.now(tz) print 'begin:%s' % (begin) article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][1]['name']})) speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \ .reducebykey(lambda x,y:x+y) \ .filter(lambda x:len(x[1])>=2) \ .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[1],reverse = true)[0],sorted(x[1],key=lambda y:y[1],reverse = true)[1])) \ .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \ .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':5*300*((x[1][1]-x[2][1])/((x[1][0]-x[2][0]).seconds/60.0))})) \ .filter(lambda x:x[1]['comments']>0) statistics = article_channels.join(speed_rdd) \ .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']}) timeone=datetime.now()-timedelta(hours=1) timethree = datetime.now()-timedelta(hours=3) timesix = datetime.now()-timedelta(hours=6) timetwelve = datetime.now()-timedelta(hours=12) timetwentyfour = datetime.now()-timedelta(hours=24) result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree).map(lambda x:row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix).map(lambda x:row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve).map(lambda x:row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour).map(lambda x:row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) if result1.count()>0: print 'result1 insert=======>',result1.take(1) session_statis.execute('delete statistics source = %s , timespan= %s', (source,'1')) resultschema1 = sqlcontext.createdataframe(result1) resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") if result3.count()>0: print 'result3 insert=======>',result3.take(1) session_statis.execute('delete statistics source = %s , timespan= %s', (source,'3')) resultschema3 = sqlcontext.createdataframe(result3) resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") if result6.count()>0: print 'result6 insert=======>',result6.take(1) session_statis.execute('delete statistics source = %s , timespan= %s', (source,'6')) resultschema6 = sqlcontext.createdataframe(result6) resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") if result12.count()>0: print 'result12 insert=======>',result12.take(1) session_statis.execute('delete statistics source = %s , timespan= %s', (source,'12')) resultschema12 = sqlcontext.createdataframe(result12) resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") if result24.count()>0: print 'result24 insert=======>',result24.take(1) session_statis.execute('delete statistics source = %s , timespan= %s', (source,'24')) resultschema24 = sqlcontext.createdataframe(result24) resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") end = datetime.now(tz) print 'timeone:%s timethree:%s timesix:%s timetwelve:%s timetwentyfour:%s' % (timeone,timethree,timesix,timetwelve,timetwentyfour) print 'all done==================================:time:%s' % (datetime.now(tz)) print 'total time is:%s seconds' % ((end-begin).seconds)
edit:
this code has 2 parts. 1) reading messages kafka , saving them cassandra. 2) reading data cassandra , analyzing data , afterwards saving analyzed data cassandra.
import sys import json pyspark import sparkcontext, sparkconf, rddsampler pyspark.streaming import streamingcontext pyspark.sql import sqlcontext, row pyspark.streaming.kafka import offsetrange, kafkautils, topicandpartition datetime import datetime, timedelta dateutil.parser import parse import pickle cassandra.cluster import cluster expiringdict import expiringdict import pytz dateutil.tz import tzutc tz = pytz.timezone('') appname = str(sys.argv[1]) source = str(sys.argv[2]) cluster = cluster(['localhost']); session_statis = cluster.connect('keyspace') def read_json(x): try: y = json.loads(x) except: y = 0 return y def categorytransform(x): try: body = json.loads(x['body']) return (body['article']) except: return 0 def transformindata(x): try: body = json.loads(x['body']) return (body['articles']) except: return 0 def axestransformdata(x): try: body = json.loads(x['body']) body['id'] = x['attrs']['id'] return (body) except: return 0 def articleincache(rdd): rdd_channel=rdd.map(lambda x:(x[1]['channel'],{'id':x[0],'title':x[1]['title'],'thumbnail':x[1]['thumbnail'],'url':x[1]['url'],'created_at':x[1]['created_at']})) \ .join(channels).map(lambda x:{'id':x[1][0]['id'],'title':x[1][0]['title'],'thumbnail':x[1][0]['thumbnail'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'author':x[1][1]['name']}) rdd_cassandra = rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'] if x[1]['thumbnail'] else '',x[1]['title'],x[1]['url'],datetime.strptime(parse(x[1]['created_at']).strftime('%y-%m-%d %h:%m:%s'), "%y-%m-%d %h:%m:%s")+timedelta(hours=8),source,x[1]['category'] if x[1]['category'] else '',x[1]['channel']))) \ .subtract(articles) rdd_article = rdd_cassandra.map(lambda x:row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],category=x[1][6],channel=x[1][7])) rdd_schedule = rdd_cassandra.map(lambda x:row(source=x[1][5],type='article',scheduled_for=x[1][4]+timedelta(minutes=5),id=x[1][0])) rdd_article_by_created_at = rdd_cassandra.map(lambda x:row(source=x[1][5],created_at=x[1][4],article=x[1][0])) rdd_article_by_url = rdd_cassandra.map(lambda x:row(url=x[1][3],article=x[1][0])) if rdd_article.count()>0: result_rdd_article = sqlcontext.createdataframe(rdd_article) result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if rdd_schedule.count()>0: result_rdd_schedule = sqlcontext.createdataframe(rdd_schedule) result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if rdd_article_by_created_at.count()>0: result_rdd_article_by_created_at = sqlcontext.createdataframe(rdd_article_by_created_at) result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if rdd_article_by_url.count()>0: result_rdd_article_by_url = sqlcontext.createdataframe(rdd_article_by_url) result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") def categoryincache(x): article_update=articles.join(x).map(lambda x:row(id=x[1][0][0],source=x[1][0][5],thumbnail=x[1][0][1],title=x[1][0][2],url=x[1][0][3],created_at=x[1][0][4],category=x[1][1]['category'],channel=x[1][0][7])) if article_update.count()>0: result_article_update = sqlcontext.createdataframe(article_update) result_article_update.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") def axesincache(rdd): if rdd.count()>0: axes_rdd=rdd.map(lambda x:row(article=x[0],at=datetime.strptime(parse(x[1]['at']).strftime('%y-%m-%d %h:%m:%s'), "%y-%m-%d %h:%m:%s")+timedelta(hours=8),comments=x[1]['comments'],likes=x[1]['likes'],reads=0,shares=0)) axesresult = sqlcontext.createdataframe(axes_rdd) axesresult.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") def statistics(rdd): article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][1]['name']})) speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \ .reducebykey(lambda x,y:x+y) \ .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[0],reverse = true)[0],sorted(x[1],key=lambda y:y[0],reverse = true)[1]) if len(x[1])>=2 else (x[0],sorted(x[1],key=lambda y:y[0],reverse = true)[0],[sorted(x[1],key=lambda y:y[0],reverse = true)[0][0]-timedelta(seconds=300),0,0,0,0])) \ .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \ .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':5*300*((x[1][1]-x[2][1])/((x[1][0]-x[2][0]).seconds/60.0))})) \ .filter(lambda x:x[1]['comments']>0) statistics = article_channels.join(speed_rdd) \ .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']}) timeone=datetime.now()-timedelta(hours=1) timethree = datetime.now()-timedelta(hours=3) timesix = datetime.now()-timedelta(hours=6) timetwelve = datetime.now()-timedelta(hours=12) timetwentyfour = datetime.now()-timedelta(hours=24) result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree).map(lambda x:row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix).map(lambda x:row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve).map(lambda x:row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour).map(lambda x:row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) if result1.count()>0: session_statis.execute('delete tablename source = %s , timespan= %s', (source,'1')) resultschema1 = sqlcontext.createdataframe(result1) resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if result3.count()>0: session_statis.execute('delete tablename source = %s , timespan= %s', (source,'3')) resultschema3 = sqlcontext.createdataframe(result3) resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if result6.count()>0: session_statis.execute('delete statistics source = %s , timespan= %s', (source,'6')) resultschema6 = sqlcontext.createdataframe(result6) resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if result12.count()>0: session_statis.execute('delete statistics source = %s , timespan= %s', (source,'12')) resultschema12 = sqlcontext.createdataframe(result12) resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") if result24.count()>0: session_statis.execute('delete statistics source = %s , timespan= %s', (source,'24')) resultschema24 = sqlcontext.createdataframe(result24) resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") conf = sparkconf().setappname(appname) sc = sparkcontext(conf=conf) ssc = streamingcontext(sc,30) sqlcontext = sqlcontext(sc) channels = sc.cassandratable("keyspace","tablename").map(lambda x:(x.id,{'name':x.name})) articles = sc.cassandratable("keyspace","tablename").map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),x.source,x.category,x.channel))) articlestat = sc.cassandratable('keyspace','tablename').map(lambda x:(x.channel,{'id':x.id,'thumbnail':x.thumbnail,'title':x.title,'url':x.url,'created_at':x.created_at,'source':x.source,'category':x.category,'channel':x.channel})) axes = sc.cassandratable('keyspace','tablename') topic = 'topic' kafkaparams = {"metadata.broker.list": "localhost:9092"} category = 'category_topic' category_stream = kafkautils.createdirectstream(ssc, [category], kafkaparams) category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categorytransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x)) category_join_stream.transform(categoryincache).pprint() article_stream = kafkautils.createdirectstream(ssc, [topic], kafkaparams) article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:transformindata(x)).filter(lambda x: x!=0).flatmap(lambda x:(a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) article_join_stream.transform(articleincache).pprint() axes_topic = 'axes_topic' axes_stream = kafkautils.createdirectstream(ssc, [axes_topic], kafkaparams) axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axestransformdata(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x))#.map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']})) axes_join_stream.transform(axesincache).pprint() stat = article_join_stream.map(lambda x:x['id']).window(5*60,5*60) stat.transform(statistics).pprint() ssc.start() # start computation ssc.awaittermination() ssc.awaittermination()
thanks lot reply!
Comments
Post a Comment