import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window
object accounts_ver_creation
{
def main(args: Array[String])
{
val conf = new SparkConf()
val sc=new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val high_date = "9999-12-31 00:00:00"
val df_source_main = sqlContext.sql("SELECT * FROM sqoop.accounts")
val df_source_aud = sqlContext.sql("SELECT * FROM sqoop.accounts_aud")
val df_u = df_source_aud.filter(col("action_code") === "U")
val df_d = df_source_aud.filter(col("action_code") === "D")
val raw_column_names = df_source_main.columns
val column_names = raw_column_names ++ Array("is_current", "is_updated", "is_deleted", "start_date", "enddate","tot")
val res = ddf_source_main.withColumn("tot",df_source_main.groupBy("account_number").agg(count("account_number"))).withColumn("ver",df_source_aud.groupBy("account_number").agg(count("account_number"))).select(column_names.map(col): _*)
val df_target_main = df_source_main.withColumn("start_date", df_source_main("update_date")).withColumn("enddate", lit(high_date)).withColumn("is_current", lit(1)).withColumn("is_updated", lit(false)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_u = df_d.withColumn("enddate", df_u("end_date")).withColumn("start_date", df_u("create_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_d = df_u.withColumn("enddate", df_d("end_update")).withColumn("start_date", df_d("update_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(true)).select(column_names.map(col): _*)
val df_merge = df_target_main.unionAll(df_target_u).unionAll(df_target_d)
val df_merge_final = df_merge.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))
sqlContext.sql("DROP TABLE IF EXISTS sqoop.accounts_ver")
df_merge_final.write.format("orc").option("path", "/user/hive/warehouse/sqoop.db/accounts_ver").mode("overwrite").saveAsTable("sqoop.accounts_ver")
}
}
Error Message:
<console>:240: error: type mismatch;
found : org.apache.spark.sql.DataFrame
required: org.apache.spark.sql.Column