package com.cloudera.cde.workflow;

import com.cloudera.cde.util.CliArgs$;
import java.time.Instant;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DoubleType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.TimestampType$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ClassTag$;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.NonLocalReturnControl;

/* compiled from: BiWorkflow.scala */
/* loaded from: input_file:com/cloudera/cde/workflow/BiWorkflow$.class */
public final class BiWorkflow$ {
    public static final BiWorkflow$ MODULE$ = null;
    private final Logger com$cloudera$cde$workflow$BiWorkflow$$log;
    private final BiWorkflowCliArgs cliArgs;
    private final long jobStartTimeInSecs;

    static {
        new BiWorkflow$();
    }

    public Logger com$cloudera$cde$workflow$BiWorkflow$$log() {
        return this.com$cloudera$cde$workflow$BiWorkflow$$log;
    }

    private BiWorkflowCliArgs cliArgs() {
        return this.cliArgs;
    }

    private long jobStartTimeInSecs() {
        return this.jobStartTimeInSecs;
    }

    public void main(String[] strArr) {
        CliArgs$.MODULE$.parseArgs(strArr, cliArgs());
        com$cloudera$cde$workflow$BiWorkflow$$log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Cli args: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{cliArgs()})));
        String replaceAll = Instant.ofEpochSecond(jobStartTimeInSecs()).toString().replaceAll(":", "-");
        com$cloudera$cde$workflow$BiWorkflow$$log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Run Id: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{replaceAll})));
        SparkSession sparkSession = sparkSession();
        try {
            com$cloudera$cde$workflow$BiWorkflow$$log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"TimeZone: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TimeZone.getDefault().getID()})));
            Predef$.MODULE$.assert(TimeZone.getDefault().getID().contains("UTC"), new BiWorkflow$$anonfun$main$1());
            com$cloudera$cde$workflow$BiWorkflow$$log().info("Processing current weather data");
            processWeatherData(sparkSession, WorkflowConfig$.MODULE$.apply(false, replaceAll, cliArgs()));
            com$cloudera$cde$workflow$BiWorkflow$$log().info("Processing forecast weather data");
            processWeatherData(sparkSession, WorkflowConfig$.MODULE$.apply(true, replaceAll, cliArgs()));
        } finally {
            sparkSession.stop();
        }
    }

    private void processWeatherData(SparkSession sparkSession, WorkflowConfig workflowConfig) {
        Seq<FileInfo> weatherDataFiles = weatherDataFiles(sparkSession.sparkContext().hadoopConfiguration(), workflowConfig);
        if (weatherDataFiles.isEmpty()) {
            com$cloudera$cde$workflow$BiWorkflow$$log().warn("No weather data files found");
            return;
        }
        Dataset<Row> applyTimezoneCorrection = applyTimezoneCorrection(readWeatherDataAvroFiles(sparkSession, weatherDataFiles, workflowConfig));
        Dataset<Row> aggregateWeatherData = aggregateWeatherData(filterProcessedData(sparkSession, applyTimezoneCorrection), workflowConfig);
        writeAggregatedData(aggregateWeatherData.select(Predef$.MODULE$.wrapRefArray((Column[]) Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(aggregateWeatherData.columns()).filterNot(new BiWorkflow$$anonfun$1())).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"city"})), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)))).map(new BiWorkflow$$anonfun$2(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class))))), workflowConfig);
        if (cliArgs().skipArchiving()) {
            com$cloudera$cde$workflow$BiWorkflow$$log().info("Skipping archiving");
        } else {
            archiveFiles(weatherDataFiles, workflowConfig.isForecastData() ? (Set) Predef$.MODULE$.Set().apply(Nil$.MODULE$) : Predef$.MODULE$.intArrayOps((int[]) Predef$.MODULE$.refArrayOps((Object[]) applyTimezoneCorrection.filter(applyTimezoneCorrection.apply("local_date").$eq$eq$eq(functions$.MODULE$.to_date(functions$.MODULE$.from_unixtime(applyTimezoneCorrection.apply("timezone_offset").$plus(BoxesRunTime.boxToLong(jobStartTimeInSecs())))))).select("file_id", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().collect()).map(new BiWorkflow$$anonfun$3(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int()))).toSet(), workflowConfig.archiveDir(), sparkSession.sparkContext().hadoopConfiguration());
        }
    }

    private void writeAggregatedData(Dataset<Row> dataset, WorkflowConfig workflowConfig) {
        dataset.write().format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").partitionBy(Predef$.MODULE$.wrapRefArray(new String[]{"city"})).mode(workflowConfig.saveMode()).option("table", workflowConfig.outputTable()).option("fileformat", "parquet").save();
    }

    private void archiveFiles(Seq<FileInfo> seq, Set<Object> set, String str, Configuration configuration) {
        com$cloudera$cde$workflow$BiWorkflow$$log().info("Archiving processed raw data files");
        Path path = new Path(str);
        com$cloudera$cde$workflow$BiWorkflow$$log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Creating archive dir: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{path})));
        Predef$.MODULE$.assert(path.getFileSystem(configuration).mkdirs(path), new BiWorkflow$$anonfun$archiveFiles$1(path));
        ((IterableLike) seq.filterNot(new BiWorkflow$$anonfun$archiveFiles$2(set))).foreach(new BiWorkflow$$anonfun$archiveFiles$3(configuration, path));
    }

    private Dataset<Row> filterProcessedData(SparkSession sparkSession, Dataset<Row> dataset) {
        Dataset filter = dataset.filter(dataset.apply("city").isNotNull().and(dataset.apply("local_date").isNotNull()));
        Dataset<Row> filter2 = filter.filter(filter.apply("local_date").$eq$bang$eq(functions$.MODULE$.to_date(functions$.MODULE$.from_unixtime(filter.apply("timezone_offset").$plus(BoxesRunTime.boxToLong(jobStartTimeInSecs()))))));
        if (!sparkSession.catalog().tableExists(cliArgs().weatherTable())) {
            return filter2;
        }
        Dataset agg = sparkSession.table(cliArgs().weatherTable()).groupBy("city", Predef$.MODULE$.wrapRefArray(new String[0])).agg(functions$.MODULE$.max("local_date").as("max_local_date"), Predef$.MODULE$.wrapRefArray(new Column[0]));
        return filter2.alias("a").join(agg, filter2.apply("city").$eq$eq$eq(agg.apply("city")), "LEFT_OUTER").filter(agg.apply("max_local_date").isNull().or(filter2.apply("local_date").$greater(agg.apply("max_local_date")))).select("a.*", Predef$.MODULE$.wrapRefArray(new String[0]));
    }

    private Dataset<Row> applyTimezoneCorrection(Dataset<Row> dataset) {
        return dataset.withColumn("local_date", functions$.MODULE$.to_date(functions$.MODULE$.from_unixtime(dataset.apply("utc_time").$plus(dataset.apply("timezone_offset"))))).withColumn("local_timestamp", functions$.MODULE$.from_unixtime(dataset.apply("utc_time").$plus(dataset.apply("timezone_offset"))));
    }

    private Dataset<Row> readWeatherDataAvroFiles(SparkSession sparkSession, Seq<FileInfo> seq, WorkflowConfig workflowConfig) {
        String snowRainHourBucket = workflowConfig.snowRainHourBucket();
        return (Dataset) ((TraversableOnce) seq.map(new BiWorkflow$$anonfun$readWeatherDataAvroFiles$1(sparkSession, workflowConfig, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"city", "temperature", "feels_like_temperature", "humidity", "wind_speed", "pressure", new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket})), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket})), "utc_time", "timezone_offset", "file_id"})), Predef$.MODULE$.Set().apply(Nil$.MODULE$)), Seq$.MODULE$.canBuildFrom())).reduce(new BiWorkflow$$anonfun$readWeatherDataAvroFiles$2());
    }

    private Column kelvinToFahrenheit(Column column) {
        return column.$minus(BoxesRunTime.boxToDouble(273.15d)).$times(BoxesRunTime.boxToDouble(1.8d)).$plus(BoxesRunTime.boxToDouble(32.0d)).cast(DoubleType$.MODULE$);
    }

    private Dataset<Row> aggregateWeatherData(Dataset<Row> dataset, WorkflowConfig workflowConfig) {
        String snowRainHourBucket = workflowConfig.snowRainHourBucket();
        return dataset.withColumn("temperature_f", kelvinToFahrenheit(dataset.apply("temperature"))).withColumn("feels_like_temperature_f", kelvinToFahrenheit(dataset.apply("feels_like_temperature"))).groupBy("city", Predef$.MODULE$.wrapRefArray(new String[]{"local_date"})).agg(functions$.MODULE$.avg("temperature").cast(DoubleType$.MODULE$).as("avg_temp_k"), Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.min("temperature").cast(DoubleType$.MODULE$).as("min_temp_k"), functions$.MODULE$.max("temperature").cast(DoubleType$.MODULE$).as("max_temp_k"), functions$.MODULE$.avg("temperature_f").cast(DoubleType$.MODULE$).as("avg_temp_f"), functions$.MODULE$.min("temperature_f").cast(DoubleType$.MODULE$).as("min_temp_f"), functions$.MODULE$.max("temperature_f").cast(DoubleType$.MODULE$).as("max_temp_f"), functions$.MODULE$.avg("feels_like_temperature").cast(DoubleType$.MODULE$).as("avg_feels_like_temp_k"), functions$.MODULE$.min("feels_like_temperature").cast(DoubleType$.MODULE$).as("min_feels_like_temp_k"), functions$.MODULE$.max("feels_like_temperature").cast(DoubleType$.MODULE$).as("max_feels_like_temp_k"), functions$.MODULE$.avg("feels_like_temperature_f").cast(DoubleType$.MODULE$).as("avg_feels_like_temp_f"), functions$.MODULE$.min("feels_like_temperature_f").cast(DoubleType$.MODULE$).as("min_feels_like_temp_f"), functions$.MODULE$.max("feels_like_temperature_f").cast(DoubleType$.MODULE$).as("max_feels_like_temp_f"), functions$.MODULE$.avg("humidity").cast(DoubleType$.MODULE$).as("avg_humidity"), functions$.MODULE$.min("humidity").cast(DoubleType$.MODULE$).as("min_humidity"), functions$.MODULE$.max("humidity").cast(DoubleType$.MODULE$).as("max_humidity"), functions$.MODULE$.avg("pressure").cast(DoubleType$.MODULE$).as("avg_pressure"), functions$.MODULE$.min("pressure").cast(DoubleType$.MODULE$).as("min_pressure"), functions$.MODULE$.max("pressure").cast(DoubleType$.MODULE$).as("max_pressure"), functions$.MODULE$.avg("wind_speed").cast(DoubleType$.MODULE$).as("avg_wind_speed"), functions$.MODULE$.min("wind_speed").cast(DoubleType$.MODULE$).as("min_wind_speed"), functions$.MODULE$.max("wind_speed").cast(DoubleType$.MODULE$).as("max_wind_speed"), functions$.MODULE$.avg(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))).cast(DoubleType$.MODULE$).as(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"avg_snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))), functions$.MODULE$.min(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))).cast(DoubleType$.MODULE$).as(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"min_snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))), functions$.MODULE$.max(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))).cast(DoubleType$.MODULE$).as(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"max_snow_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))), functions$.MODULE$.avg(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))).cast(DoubleType$.MODULE$).as(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"avg_rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))), functions$.MODULE$.min(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))).cast(DoubleType$.MODULE$).as(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"min_rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))), functions$.MODULE$.max(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket}))).cast(DoubleType$.MODULE$).as(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"max_rain_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{snowRainHourBucket})))})).withColumn("creation_timestamp", functions$.MODULE$.typedLit(BoxesRunTime.boxToLong(jobStartTimeInSecs()), package$.MODULE$.universe().TypeTag().Long()).cast(TimestampType$.MODULE$));
    }

    private SparkSession sparkSession() {
        com$cloudera$cde$workflow$BiWorkflow$$log().info("Creating spark session");
        SparkSession.Builder config = SparkSession$.MODULE$.builder().appName("BiWorkflow").config("spark.kyro.registrator", "com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator").config("spark.sql.extensions", "com.hortonworks.spark.sql.rule.Extensions").config("spark.datasource.hive.warehouse.read.mode", "DIRECT_READER_V1");
        if (cliArgs().isLocal()) {
            config.config("spark.sql.catalogImplementation", "hive");
            config.master("local");
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        SparkSession orCreate = config.getOrCreate();
        if (cliArgs().isLocal()) {
            orCreate.sparkContext().setLogLevel("INFO");
        }
        com$cloudera$cde$workflow$BiWorkflow$$log().info("Spark session created");
        return orCreate;
    }

    private Seq<FileInfo> weatherDataFiles(Configuration configuration, WorkflowConfig workflowConfig) {
        Path path = new Path(workflowConfig.weatherDataRootDir());
        RemoteIterator listFiles = path.getFileSystem(configuration).listFiles(path, false);
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        while (listFiles.hasNext()) {
            arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new String[]{((LocatedFileStatus) listFiles.next()).getPath().toString()}));
        }
        return (Seq) ((TraversableLike) arrayBuffer.zipWithIndex(ArrayBuffer$.MODULE$.canBuildFrom())).map(new BiWorkflow$$anonfun$weatherDataFiles$1(), ArrayBuffer$.MODULE$.canBuildFrom());
    }

    public Dataset<Row> com$cloudera$cde$workflow$BiWorkflow$$flattenDataFrame(Dataset<Row> dataset, Set<String> set) {
        Object obj = new Object();
        try {
            StructField[] structFieldArr = (StructField[]) Predef$.MODULE$.refArrayOps(dataset.schema().fields()).filter(new BiWorkflow$$anonfun$4(set));
            Predef$.MODULE$.refArrayOps(structFieldArr).indices().foreach$mVc$sp(new BiWorkflow$$anonfun$com$cloudera$cde$workflow$BiWorkflow$$flattenDataFrame$1(dataset, set, structFieldArr, (String[]) Predef$.MODULE$.refArrayOps(structFieldArr).map(new BiWorkflow$$anonfun$5(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), obj));
            return dataset;
        } catch (NonLocalReturnControl e) {
            if (e.key() == obj) {
                return (Dataset) e.value();
            }
            throw e;
        }
    }

    private BiWorkflow$() {
        MODULE$ = this;
        this.com$cloudera$cde$workflow$BiWorkflow$$log = LoggerFactory.getLogger(getClass());
        this.cliArgs = new BiWorkflowCliArgs(BiWorkflowCliArgs$.MODULE$.apply$default$1(), BiWorkflowCliArgs$.MODULE$.apply$default$2(), BiWorkflowCliArgs$.MODULE$.apply$default$3(), BiWorkflowCliArgs$.MODULE$.apply$default$4(), BiWorkflowCliArgs$.MODULE$.apply$default$5());
        this.jobStartTimeInSecs = System.currentTimeMillis() / 1000;
    }
}
