Aws logs are not writing in cloud watch after certain steps

0 votes

i have an aws job which reads a csv file in pyspark which write the logs in aws cloud watch . the logs are writing in the initial steps but after a certain step the logs are not written

```

 for i, key in enumerate(keys):
            source_path = "s3://"+source_bucket+"/"+key
            if conf_format =='csv' or conf_format == 'text':
                #source_df = spark.read.option("header",conf_header).option("inferSchema","true").option("delimiter",conf_delimiter).csv(source_path)
                #source_df = spark.read.option("header",conf_header).option("inferSchema","true").option("delimiter",conf_delimiter).csv(source_path)
                validated_df_2=schema_validation(source_path)
                source_df_2=validated_df_2.filter(validated_df_2.valid_rec == "Y")
                print("printing source_df")
                source_df=source_df_2.drop(source_df_2.valid_rec)
                print("printing source_df after drop dolumn")
                source_df.printSchema()
                source_df.show(5)
            elif conf_format =='json':
                source_df = spark.read.option("multiline", "true").json(source_path)
            elif conf_format =='avro':
                source_df = spark.read.format("com.databricks.spark.avro").load(source_path)
            if i==0:
                target_df = source_df
            else:
                target_df = target_df.union(source_df)
        ct_before = target_df.count()
        #remove all null values
        target_df.na.drop("all")
        #Convert column names to lower case
        lower_df = target_df.toDF(*[c.lower() for c in target_df.columns])
        #Convert slash into hyphen in column name 
        col_df = lower_df.toDF(*list(map(lambda col : col if '/' not in col else col[1:].replace('/', '-'), lower_df.columns)))
        #Convert whitespace into empty in column name
        final_df = col_df.toDF(*(c.replace(' ', '') for c in col_df.columns))
        #remove duplicates
        col = final_df.columns
        col1 = final_df.columns[0]
        print(col)
        print(col1)
        win = Window.partitionBy(final_df.columns).orderBy(col1)
        df_with_rn = final_df.withColumn("row_num", f.row_number().over(win))
        df_with_rn.createOrReplaceTempView("t_stage")
        deduped_df = spark.sql(""" select * from t_stage where row_num = 1
                               """)
        delta_df = deduped_df.drop(deduped_df.row_num)
        print("show delta df schema and data")
        delta_df.printSchema()// till line prints in cloud watch . after this no logs and job is running for ever 
        delta_df.show(5)
// have more than 1000 lines 
```

Jul 30, 2021 in Apache Spark by Anjali

edited 4 days ago 11 views

No answer to this question. Be the first to respond.

Your answer

Your name to display (optional):
Privacy: Your email address will only be used for sending these notifications.
webinar REGISTER FOR FREE WEBINAR X
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP