2023年8月3日发(作者:)

SparkSql批量插⼊或更新保存数据到Pgsql在sparksql 中,保存数据到数据,只有 Append , Overwrite , ErrorIfExists, Ignore 四种模式,不满⾜项⽬需求 ,现依据 spark save 源码,进⾏进⼀步的改造, 批量保存数据,存在则更新 不存在 则插⼊** *测试⽤例 * 批量保存数据,存在则更新 不存在 则插⼊ * INSERT INTO test_001 VALUES( ?, ?, ? ) * ON conflict ( ID ) DO * UPDATE SET id=?,NAME = ?,age = ?; * @author linzhy */object InsertOrUpdateTest { def main(args: Array[String]): Unit = { val spark = r() .appName(pleName) .master("local[2]") .config("tringFields","100") .getOrCreate() var config = () val ods_url = ing("d_") val ods_user = ing("d_") val ods_password = ing("d_rd") val test_001 = ("jdbc") .option("url", ods_url) .option("dbtable", "test_001") .option("user", ods_user) .option("password", ods_password) .load() test_OrReplaceTempView("test_001") val sql= """ |SELECT * FROM test_001 |""".stripMargin val dataFrame = (sql) //批量保存数据,存在则更新

不存在

则插⼊ OrUpdateToPgsql(dataFrame,ontext,"test_001_copy1","id") (); }}insertOrUpdateToPgsql ⽅法源码/** *

批量插⼊

或更新

数据 ,该⽅法

借鉴()

源码 * @param dataFrame * @param sc * @param table * @param id */ def insertOrUpdateToPgsql(dataFrame:DataFrame,sc:SparkContext,table:String,id:String): Unit ={

val tableSchema = val columns =(x => ).mkString(",") val placeholders = (_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set " val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set " val update = (x => ng + "=?" ).mkString(",") val realsql =(update) val conn =connectionPool() oCommit(false) val dialect = () val broad_ps = ast(eStatement(realsql)) val numFields = *2 //调⽤spark中⾃带的函数

或者

捞出来,获取属性字段与字段类型 val nullTypes = (f => getJdbcType(pe, dialect).jdbcNullType) val setters = (f => makeSetter(conn,pe))

var rowCount = 0 val batchSize = 2000 val updateindex = numFields / 2 try { hPartition(iterator =>{ //遍历批量提交 val ps = broad_ try{ while (t) { val row = () var i = 0 while (i < numFields) { i < updateindex match { case true =>{ if (At(i)) { l(i + 1, nullTypes(i)) } else { setters(i).apply(ps, row, i,0) } } case false =>{ if (At(i-updateindex)) { l(i + 1, nullTypes(i-updateindex)) } else { setters(i-updateindex).apply(ps, row, i,updateindex) } } } i = i + 1 } ch() rowCount += 1 if (rowCount % batchSize == 0) { eBatch() rowCount = 0 } } if (rowCount > 0) { eBatch() } }finally { () } }) () }catch { case e: Exception => logError("Error in execution of insert. " + sage) ck() // insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",sage) // insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",sage) }finally { () } }从spark 源码中捞出 getJdbcType /makeSetter函数 private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { CType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"Can't get JDBC type for ${gString}")) } private type JDBCValueSetter_add = (PreparedStatement, Row, Int,Int) => Unit

private def makeSetter(conn: Connection, dataType: DataType): JDBCValueSetter_add = dataType match { case IntegerType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => (pos + 1, (pos-currentpos)) case LongType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => g(pos + 1, g(pos-currentpos)) case DoubleType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => ble(pos + 1, ble(pos-currentpos)) case FloatType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => at(pos + 1, at(pos-currentpos)) case ShortType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => (pos + 1, rt(pos-currentpos)) case ByteType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => (pos + 1, e(pos-currentpos)) case BooleanType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => lean(pos + 1, lean(pos-currentpos)) case StringType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => ing(pos + 1, ing(pos-currentpos)) case BinaryType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => es(pos + 1, [Array[Byte]](pos-currentpos)) case TimestampType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => estamp(pos + 1, [amp](pos-currentpos)) case DateType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => e(pos + 1, [](pos-currentpos)) case t: DecimalType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => Decimal(pos + 1, imal(pos-currentpos)) case _ => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => throw new IllegalArgumentException( s"Can't translate non-null value for field $pos") }

2023年8月3日发(作者:)

SparkSql批量插⼊或更新保存数据到Pgsql在sparksql 中,保存数据到数据,只有 Append , Overwrite , ErrorIfExists, Ignore 四种模式,不满⾜项⽬需求 ,现依据 spark save 源码,进⾏进⼀步的改造, 批量保存数据,存在则更新 不存在 则插⼊** *测试⽤例 * 批量保存数据,存在则更新 不存在 则插⼊ * INSERT INTO test_001 VALUES( ?, ?, ? ) * ON conflict ( ID ) DO * UPDATE SET id=?,NAME = ?,age = ?; * @author linzhy */object InsertOrUpdateTest { def main(args: Array[String]): Unit = { val spark = r() .appName(pleName) .master("local[2]") .config("tringFields","100") .getOrCreate() var config = () val ods_url = ing("d_") val ods_user = ing("d_") val ods_password = ing("d_rd") val test_001 = ("jdbc") .option("url", ods_url) .option("dbtable", "test_001") .option("user", ods_user) .option("password", ods_password) .load() test_OrReplaceTempView("test_001") val sql= """ |SELECT * FROM test_001 |""".stripMargin val dataFrame = (sql) //批量保存数据,存在则更新

不存在

则插⼊ OrUpdateToPgsql(dataFrame,ontext,"test_001_copy1","id") (); }}insertOrUpdateToPgsql ⽅法源码/** *

批量插⼊

或更新

数据 ,该⽅法

借鉴()

源码 * @param dataFrame * @param sc * @param table * @param id */ def insertOrUpdateToPgsql(dataFrame:DataFrame,sc:SparkContext,table:String,id:String): Unit ={

val tableSchema = val columns =(x => ).mkString(",") val placeholders = (_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set " val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set " val update = (x => ng + "=?" ).mkString(",") val realsql =(update) val conn =connectionPool() oCommit(false) val dialect = () val broad_ps = ast(eStatement(realsql)) val numFields = *2 //调⽤spark中⾃带的函数

或者

捞出来,获取属性字段与字段类型 val nullTypes = (f => getJdbcType(pe, dialect).jdbcNullType) val setters = (f => makeSetter(conn,pe))

var rowCount = 0 val batchSize = 2000 val updateindex = numFields / 2 try { hPartition(iterator =>{ //遍历批量提交 val ps = broad_ try{ while (t) { val row = () var i = 0 while (i < numFields) { i < updateindex match { case true =>{ if (At(i)) { l(i + 1, nullTypes(i)) } else { setters(i).apply(ps, row, i,0) } } case false =>{ if (At(i-updateindex)) { l(i + 1, nullTypes(i-updateindex)) } else { setters(i-updateindex).apply(ps, row, i,updateindex) } } } i = i + 1 } ch() rowCount += 1 if (rowCount % batchSize == 0) { eBatch() rowCount = 0 } } if (rowCount > 0) { eBatch() } }finally { () } }) () }catch { case e: Exception => logError("Error in execution of insert. " + sage) ck() // insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",sage) // insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",sage) }finally { () } }从spark 源码中捞出 getJdbcType /makeSetter函数 private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { CType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"Can't get JDBC type for ${gString}")) } private type JDBCValueSetter_add = (PreparedStatement, Row, Int,Int) => Unit

private def makeSetter(conn: Connection, dataType: DataType): JDBCValueSetter_add = dataType match { case IntegerType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => (pos + 1, (pos-currentpos)) case LongType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => g(pos + 1, g(pos-currentpos)) case DoubleType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => ble(pos + 1, ble(pos-currentpos)) case FloatType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => at(pos + 1, at(pos-currentpos)) case ShortType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => (pos + 1, rt(pos-currentpos)) case ByteType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => (pos + 1, e(pos-currentpos)) case BooleanType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => lean(pos + 1, lean(pos-currentpos)) case StringType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => ing(pos + 1, ing(pos-currentpos)) case BinaryType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => es(pos + 1, [Array[Byte]](pos-currentpos)) case TimestampType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => estamp(pos + 1, [amp](pos-currentpos)) case DateType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => e(pos + 1, [](pos-currentpos)) case t: DecimalType => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => Decimal(pos + 1, imal(pos-currentpos)) case _ => (stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int) => throw new IllegalArgumentException( s"Can't translate non-null value for field $pos") }