pyspark - Why does my Datastax Spark app use unusual amounts of memory? -


my systems memory 32gb. have 3 machines , 1 machine has 4gb 10gb , 14gb decreasing. system has run 44 hours , memory usage high , keeps increasing, lead crash.

this spark executor picture:

enter image description here

this pyspark code uses more , more memory longer runs, don't want because system crash. suggestions on how change this?:

begin = datetime.now(tz) print 'begin:%s' % (begin)  article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][1]['name']}))  speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \             .reducebykey(lambda x,y:x+y) \             .filter(lambda x:len(x[1])>=2) \             .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[1],reverse = true)[0],sorted(x[1],key=lambda y:y[1],reverse = true)[1])) \             .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \             .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':5*300*((x[1][1]-x[2][1])/((x[1][0]-x[2][0]).seconds/60.0))})) \             .filter(lambda x:x[1]['comments']>0)  statistics = article_channels.join(speed_rdd)  \             .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']})   timeone=datetime.now()-timedelta(hours=1) timethree = datetime.now()-timedelta(hours=3) timesix = datetime.now()-timedelta(hours=6) timetwelve = datetime.now()-timedelta(hours=12) timetwentyfour = datetime.now()-timedelta(hours=24)  result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree).map(lambda x:row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix).map(lambda x:row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve).map(lambda x:row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour).map(lambda x:row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at'],genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))  if result1.count()>0:     print 'result1 insert=======>',result1.take(1)     session_statis.execute('delete statistics source = %s , timespan= %s', (source,'1'))     resultschema1 = sqlcontext.createdataframe(result1)     resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") if result3.count()>0:        print 'result3 insert=======>',result3.take(1)     session_statis.execute('delete statistics source = %s , timespan= %s', (source,'3'))     resultschema3 = sqlcontext.createdataframe(result3)     resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append")  if result6.count()>0:     print 'result6 insert=======>',result6.take(1)     session_statis.execute('delete statistics source = %s , timespan= %s', (source,'6'))     resultschema6 = sqlcontext.createdataframe(result6)     resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append")  if result12.count()>0:     print 'result12 insert=======>',result12.take(1)     session_statis.execute('delete statistics source = %s , timespan= %s', (source,'12'))     resultschema12 = sqlcontext.createdataframe(result12)     resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append")  if result24.count()>0:     print 'result24 insert=======>',result24.take(1)     session_statis.execute('delete statistics source = %s , timespan= %s', (source,'24'))     resultschema24 = sqlcontext.createdataframe(result24)     resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="statistics", keyspace = "statistics").save(mode ="append") end = datetime.now(tz) print 'timeone:%s  timethree:%s timesix:%s timetwelve:%s timetwentyfour:%s' % (timeone,timethree,timesix,timetwelve,timetwentyfour) print 'all done==================================:time:%s' % (datetime.now(tz)) print 'total time is:%s seconds' % ((end-begin).seconds) 

edit:

this code has 2 parts. 1) reading messages kafka , saving them cassandra. 2) reading data cassandra , analyzing data , afterwards saving analyzed data cassandra.

import sys import json pyspark import sparkcontext, sparkconf, rddsampler pyspark.streaming import streamingcontext pyspark.sql import sqlcontext, row pyspark.streaming.kafka import offsetrange, kafkautils, topicandpartition datetime import datetime, timedelta  dateutil.parser import parse  import pickle cassandra.cluster import cluster expiringdict import expiringdict import pytz dateutil.tz import tzutc tz = pytz.timezone('') appname = str(sys.argv[1]) source = str(sys.argv[2]) cluster = cluster(['localhost']); session_statis = cluster.connect('keyspace') def read_json(x):     try:         y = json.loads(x)     except:         y = 0     return y def categorytransform(x):     try:         body = json.loads(x['body'])         return (body['article'])     except:         return 0 def transformindata(x):     try:         body = json.loads(x['body'])         return (body['articles'])     except:         return 0 def axestransformdata(x):     try:         body = json.loads(x['body'])         body['id'] = x['attrs']['id']         return (body)     except:         return 0  def articleincache(rdd):     rdd_channel=rdd.map(lambda x:(x[1]['channel'],{'id':x[0],'title':x[1]['title'],'thumbnail':x[1]['thumbnail'],'url':x[1]['url'],'created_at':x[1]['created_at']})) \         .join(channels).map(lambda x:{'id':x[1][0]['id'],'title':x[1][0]['title'],'thumbnail':x[1][0]['thumbnail'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'author':x[1][1]['name']})     rdd_cassandra = rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'] if x[1]['thumbnail'] else '',x[1]['title'],x[1]['url'],datetime.strptime(parse(x[1]['created_at']).strftime('%y-%m-%d %h:%m:%s'), "%y-%m-%d %h:%m:%s")+timedelta(hours=8),source,x[1]['category'] if x[1]['category'] else '',x[1]['channel']))) \                         .subtract(articles)     rdd_article = rdd_cassandra.map(lambda x:row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],category=x[1][6],channel=x[1][7]))     rdd_schedule = rdd_cassandra.map(lambda x:row(source=x[1][5],type='article',scheduled_for=x[1][4]+timedelta(minutes=5),id=x[1][0]))     rdd_article_by_created_at = rdd_cassandra.map(lambda x:row(source=x[1][5],created_at=x[1][4],article=x[1][0]))     rdd_article_by_url = rdd_cassandra.map(lambda x:row(url=x[1][3],article=x[1][0]))     if rdd_article.count()>0:         result_rdd_article = sqlcontext.createdataframe(rdd_article)         result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")     if rdd_schedule.count()>0:            result_rdd_schedule = sqlcontext.createdataframe(rdd_schedule)         result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")     if rdd_article_by_created_at.count()>0:           result_rdd_article_by_created_at = sqlcontext.createdataframe(rdd_article_by_created_at)         result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")     if rdd_article_by_url.count()>0:            result_rdd_article_by_url = sqlcontext.createdataframe(rdd_article_by_url)         result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")    def categoryincache(x):     article_update=articles.join(x).map(lambda x:row(id=x[1][0][0],source=x[1][0][5],thumbnail=x[1][0][1],title=x[1][0][2],url=x[1][0][3],created_at=x[1][0][4],category=x[1][1]['category'],channel=x[1][0][7]))     if article_update.count()>0:         result_article_update = sqlcontext.createdataframe(article_update)         result_article_update.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")  def axesincache(rdd):     if rdd.count()>0:         axes_rdd=rdd.map(lambda x:row(article=x[0],at=datetime.strptime(parse(x[1]['at']).strftime('%y-%m-%d %h:%m:%s'), "%y-%m-%d %h:%m:%s")+timedelta(hours=8),comments=x[1]['comments'],likes=x[1]['likes'],reads=0,shares=0))         axesresult = sqlcontext.createdataframe(axes_rdd)         axesresult.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") def statistics(rdd):     article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][1]['name']}))     speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \                 .reducebykey(lambda x,y:x+y) \                 .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[0],reverse = true)[0],sorted(x[1],key=lambda y:y[0],reverse = true)[1]) if len(x[1])>=2 else (x[0],sorted(x[1],key=lambda y:y[0],reverse = true)[0],[sorted(x[1],key=lambda y:y[0],reverse = true)[0][0]-timedelta(seconds=300),0,0,0,0])) \                 .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \                 .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':5*300*((x[1][1]-x[2][1])/((x[1][0]-x[2][0]).seconds/60.0))})) \                 .filter(lambda x:x[1]['comments']>0)     statistics = article_channels.join(speed_rdd)  \                 .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']})     timeone=datetime.now()-timedelta(hours=1)     timethree = datetime.now()-timedelta(hours=3)     timesix = datetime.now()-timedelta(hours=6)     timetwelve = datetime.now()-timedelta(hours=12)     timetwentyfour = datetime.now()-timedelta(hours=24)     result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))     result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree).map(lambda x:row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))     result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix).map(lambda x:row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))     result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve).map(lambda x:row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))     result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour).map(lambda x:row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre='',reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))     if result1.count()>0:         session_statis.execute('delete tablename source = %s , timespan= %s', (source,'1'))         resultschema1 = sqlcontext.createdataframe(result1)         resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")     if result3.count()>0:            session_statis.execute('delete tablename source = %s , timespan= %s', (source,'3'))         resultschema3 = sqlcontext.createdataframe(result3)         resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")      if result6.count()>0:         session_statis.execute('delete statistics source = %s , timespan= %s', (source,'6'))         resultschema6 = sqlcontext.createdataframe(result6)         resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")      if result12.count()>0:         session_statis.execute('delete statistics source = %s , timespan= %s', (source,'12'))         resultschema12 = sqlcontext.createdataframe(result12)         resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")      if result24.count()>0:         session_statis.execute('delete statistics source = %s , timespan= %s', (source,'24'))         resultschema24 = sqlcontext.createdataframe(result24)         resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") conf = sparkconf().setappname(appname) sc = sparkcontext(conf=conf) ssc = streamingcontext(sc,30) sqlcontext = sqlcontext(sc) channels = sc.cassandratable("keyspace","tablename").map(lambda x:(x.id,{'name':x.name})) articles = sc.cassandratable("keyspace","tablename").map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),x.source,x.category,x.channel))) articlestat = sc.cassandratable('keyspace','tablename').map(lambda x:(x.channel,{'id':x.id,'thumbnail':x.thumbnail,'title':x.title,'url':x.url,'created_at':x.created_at,'source':x.source,'category':x.category,'channel':x.channel})) axes = sc.cassandratable('keyspace','tablename') topic = 'topic' kafkaparams = {"metadata.broker.list": "localhost:9092"} category = 'category_topic' category_stream = kafkautils.createdirectstream(ssc, [category], kafkaparams) category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categorytransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x)) category_join_stream.transform(categoryincache).pprint() article_stream = kafkautils.createdirectstream(ssc, [topic], kafkaparams) article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:transformindata(x)).filter(lambda x: x!=0).flatmap(lambda x:(a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) article_join_stream.transform(articleincache).pprint() axes_topic = 'axes_topic' axes_stream = kafkautils.createdirectstream(ssc, [axes_topic], kafkaparams) axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axestransformdata(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x))#.map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']})) axes_join_stream.transform(axesincache).pprint() stat = article_join_stream.map(lambda x:x['id']).window(5*60,5*60) stat.transform(statistics).pprint() ssc.start()    # start computation ssc.awaittermination() ssc.awaittermination() 

thanks lot reply!


Comments

Popular posts from this blog

java - Run spring boot application error: Cannot instantiate interface org.springframework.context.ApplicationListener -

reactjs - React router and this.props.children - how to pass state to this.props.children -

Excel VBA "Microsoft Windows Common Controls 6.0 (SP6)" Location Changes -