Hi,
I need help with my code. Trying to understand the issue and fix here.
I am using foreachpartition is used to call python modules and out put is expected to be written to HDFS when run on yarn mode. This works fine in local mode.
Any inputs on this will help.
def **getJSON**(iterator):
ctx = TaskContext()
P_collection = []
for data in [row for row in iterator]:
jsondict= dict()
class_json =populatejsondata(str(data["Pid"]),str(data["Rid"]),str(data["Mid"]),str(data["AM_id"]),
str(data["H_data"]), str(data["CL_data"])
class_data=class_json
jsondict["class_data"] = class_data
P_collection.append(jsondict)
with open('/tmp/p/class'+str(ctx.partitionId())+'.json', 'w') as FILE:
json.dump(P_collection, FILE)
#-----calling function
data_df=data.repartition('C_ID')
data_new=data_df.rdd
jsondata=data_new.foreachPartition(**getJSON**)