Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
230 views
in Technique[技术] by (71.8m points)

amazon s3 - Write the final dataset output for spark Java into s3

I am not able to find the correct way to write data to s3 from dataset spark. What should be some more configurations that I should add. Do I have to mention the AWS configurations in my code or it will pick it up from local .aws/ profile?

Please guide

import java.util.Properties;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class sparkSqlMysql {

private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparkSqlMysql.class);

private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
        .getOrCreate();

public static void main(String[] args) {
    // JDBC connection properties

    final Properties connectionProperties = new Properties();
    connectionProperties.put("user", "root");
    connectionProperties.put("password", "password");
    connectionProperties.put("driver", "com.mysql.jdbc.Driver");
    final String dbTable = "(select * from Fielding) t";
    final String dbTable1 = "(select * from Salaries) m";
    final String dbTable2 = "(select * from Pitching) n";

    // Load MySQL query result as Dataset

    Dataset<Row> jdbcDF2 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable,
            connectionProperties);
    Dataset<Row> jdbcDF3 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable1,
            connectionProperties);

    Dataset<Row> jdbcDF4 = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/lahman2016", dbTable2,
            connectionProperties);

    jdbcDF2.createOrReplaceTempView("Fielding");
    jdbcDF3.createOrReplaceTempView("Salaries");
    jdbcDF4.createOrReplaceTempView("Pitching");

    Dataset<Row> sqlDF = sparkSession.sql(
            "select Salaries.yearID, avg(Salaries.salary) as Fielding from Salaries inner join Fielding ON Salaries.yearID = Fielding.yearID  AND Salaries.playerID = Fielding.playerID group by Salaries.yearID limit 5");

    Dataset<Row> sqlDF1 = sparkSession.sql(
            "select Salaries.yearID, avg(Salaries.salary) as Pitching from Salaries inner join Pitching ON Salaries.yearID = Pitching.yearID  AND Salaries.playerID = Pitching.playerID group by Salaries.yearID limit 5");

    // sqlDF.show();

    // sqlDF1.show();

    sqlDF.createOrReplaceTempView("avg_fielding");

    sqlDF1.createOrReplaceTempView("avg_pitching");

    Dataset<Row> final_query_1_output = sparkSession.sql(
            "select avg_fielding.yearID, avg_fielding.Fielding, avg_pitching.Pitching from avg_fielding inner join  avg_pitching ON avg_pitching.yearID = avg_fielding.yearID");

    final_query_1_output.show();

The output of the query is :

 final_query_1_output.show();
+------+------------------+------------------+
|yearID|          Fielding|          Pitching|
+------+------------------+------------------+
|  1990|  507978.625320787| 485947.2487437186|
|  2003|2216200.9609838845|2133800.1867612293|
|  2007|2633213.0126475547|2617533.3393665156|
|  2015|3996199.5729421354| 3955581.121535181|
|  2006| 2565803.492487479| 2534756.866972477|
+------+------------------+------------------+      
    

I want to write this dataset to s3 : how can I do that?

    final_query_1_output.write().mode("overwrite").save("s3n://druids3migration/data.csv");

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...