kdata

阅读 / 问答 / 标签

sparkdataframe转换成字节流

本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。json字符串转DataFramespark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:* 若列数据全为null会用String类型* 整数默认会用Long类型* 浮点数默认会用Double类型 val json1 = """{"a":null, "b": 23.1, "c": 1}""" val json2 ="""{"a":null, "b": "hello", "d": 1.2}""" val ds =spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.showdf.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)|-- d: double (nullable =true)若指定schema会按照schema生成DF:* schema中不存在的列会被忽略* 可以用两种方法指定schema,StructType和String,具体对应关系看后面*若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常val schema = StructType(List( StructField("a", ByteType, true), StructField("b", FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "bfloat, c short" val df = spark.read.schema(schema).json(ds) df.showdf.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float(nullable =true) |-- c: short (nullable = true)json解析相关配置参数primitivesAsString (default false): 把所有列看作string类型prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.allowComments (default false): 忽略json字符串中Java/C++风格的注释allowUnquotedFieldNames (default false): 允许不加引号的列名allowSingleQuotes (default true): 除双引号外,还允许用单引号allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符allowUnquotedControlChars (default false):允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。PERMISSIVE:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。DROPMALFORMED : 忽略整条损害记录FAILFAST : 遇到损坏记录throws an exceptioncolumnNameOfCorruptRecord(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVEmode添加的新字段,会重写spark.sql.columnNameOfCorruptRecorddateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.只有日期部分(无详细时间)timestampFormat (default yyyy-MM-dd"T"HH:mm:ss.SSSXXX):自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件以上参数可用option方法配置:val stringDF = spark.read.option("primitivesAsString", "true").json(ds)stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)二进制类型会自动用base64编码方式表示‘Man"(ascci) base64编码后为:”TWFu”val byteArr = Array("M".toByte, "a".toByte, "n".toByte) val binaryDs =spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|+----------+----+ root |-- value: binary (nullable =true) |-- b64: string(nullable =true) //=================================================dsWithB64.toJSON.show(false) +-----------------------------+ |value |+-----------------------------+ |{"value":"TWFu","b64":"TWFu"}|+-----------------------------+//================================================= val json ="""{"value":"TWFu"}""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF= spark.read.schema("value binary").json(jsonDs ) binaryDF.showbinaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|+----------+ root |-- value: binary (nullable =true)指定schema示例:以下是Spark SQL支持的所有基本类型:val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3,"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-1211:22:22.123123"}""" val ds = spark.createDataset(Seq(json)) val schema ="stringc string, shortc short, integerc int, longc long, floatc float, doublecdouble, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)df.show(false) df.printSchema+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc|datec |timestampc |+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11:22:22.123|+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--floatc: float (nullable =true) |-- doublec: double (nullable = true) |--decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)复合类型:val json = """ { "arrayc" : [ 1, 2, 3 ], "structc" : { "strc" : "efg","decimalc" : 1.1 }, "mapc" : { "key1" : 1.2, "key2" : 1.1 } } """ val ds =spark.createDataset(Seq(json))val schema = "arrayc array, structcstruct, mapc map" val df =spark.read.schema(schema).json(ds) df.show(false) df.printSchema+---------+--------+--------------------------+ |arrayc |structc |mapc |+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 -> 1.2, key2 ->1.1]| +---------+--------+--------------------------+ root |-- arrayc:array (nullable =true) | |-- element: short (containsNull = true) |-- structc:struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:string | |-- value: float (valueContainsNull =true)SparkSQL数据类型基本类型:DataType simpleString typeName sql defaultSize catalogString jsonStringType string string STRING 20 string “string”ShortType smallint short SMALLINT 2 smallint “short”IntegerType int integer INT 4 int “integer”LongType bigint long BIGINT 8 bigint “long”FloatType float float FLOAT 4 float “float”DoubleType double double DOUBLE 8 double “double”DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)“decimal(10,3)”BooleanType boolean boolean BOOLEAN 1 boolean “boolean”ByteType tinyint byte TINYINT 1 tinyint “byte”BinaryType binary binary BINARY 100 binary “binary”DateType date date DATE 4 date “date”TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”三个复合类型:DataType simpleString typeName sql defaultSize catalogString jsonArrayType(IntegerType, true) array array ARRAY 4 array{“type”:”array”,”elementType”:”integer”,”containsNull”:true}MapType(StringType, LongType, true) map map MAP28 map{“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true}StructType(StructField(“sf”, DoubleType)::Nil) struct structSTRUCT 8 struct{“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}回答于 2022-11-17抢首赞查看全部7个回答IO分类:按照数据流向分类:输入流输出流按照处理的单位划分:字节流:字节流读取的都是文件中的二进制数据,读取到的二进制数据不会经过任何处理字符流:字符流读取的数据都是以字符为单位的,字符流也是读取的文件的二进制数据,只不过会把这些二进制数据转换成我们能识别的字符字符流 = 字节流 + 解码输出字节流::------------------|OutputStream 所有输出字节流的基类 抽象类-----------|FileOutputStream 向文件输出数据的输出字节流 throws FileNotFoundExceptionFileOutputStream的使用步骤:1.找到目标文件2.建立数据通道3.把数据转换成字节数组写出4.关闭资源FileOutputStream的一些方法:close() 关闭此文件输出流并释放与此流有关的所有系统资源。write(int b) 将指定字节写入此文件输出流。write(byte[] b)   将 b.length 个字节从指定 byte 数组写入此文件输出流中。write(byte[] b, int off, int len) 将指定 byte 数组中从偏移量 off 开始的 len 个字节写入此文件输出流。注意:1.write(byte b[])方法实际上是调用了 write(byte b[], int off, int len)方法FileOutputStream要注意的细节:1.使用FileOutputStream的时候,如果目标文件不存在,那么会创建目标文件对象,然后把数据写入2.使用FileOutputStream的时候,如果目标文件已经存在,那么会清空目标文件的数据后再写入数据,如果需要再原数据的上追加数据,需要使用FileOutputStream(file,true)构造函数3.使用FileOutputStream的write方法写数据的时候,虽然接受的是一个int类型的数据,但真正写出的只是一个字节的数据,只是把低八位的二进制数据写出,其他二十四位数据全部丢弃复制代码public class Demo2 {public static void main(String[] args) {//1.找到目标文件File file = new File("D:\新建文件夹 (2)\a.txt");System.out.println("文件的源文数据是:"+readFile(file));writeFile(file,"Hello World!");System.out.println("目前文件数据是:"+readFile(file));}//输出数据public static void writeFile(File file,String data) {FileOutputStream fileOutputStream = null;try{//2.建立通道(我的源文件数据:abc 追加数据)fileOutputStream = new FileOutputStream(file,true);//3.把要写入的数据转换成字符数组fileOutputStream.write(data.getBytes());}catch(IOException e) {throw new RuntimeException(e);}finally {//关闭资源try {fileOutputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}}//输入数据public static String readFile(File file) {FileInputStream fileInputStream = null;String str = "";try {//建立数据通道fileInputStream = new FileInputStream(file);//创建缓冲字节数组byte[] buf = new byte[1024];int length = 0;while((length = fileInputStream.read(buf))!=-1) {//把字节数组转换成字符串返回str+=new String(buf,0,length);}} catch (IOException e) {throw new RuntimeException();}finally {try {fileInputStream.close();} catch (IOException e) {throw new RuntimeException(e);}}return str;}

数据库中SHRINKDATABASE是什么意思

是数据库收缩功能,将数据库压缩。压缩一些空间,整理一下数据页面.

shrinkfile和shrinkdatabase哪个更有效

DBCC SHRINKDATABASE 收缩指定数据库中的数据文件大校 语法 DBCC SHRINKDATABASE ( database_name [ , target_percent ] [ , { NOTRUNCATE | TRUNCATEONLY } ] ) 参数 database_name 是要收缩的数据库名称。数据库名称必须符合标识符的规则。有...

kdatacenter的VPS是单向还是双向流量计算

是双向,以kdaracenter为代表的这类服务商都这样。

Kdata金田S3-64GBSSD固态硬盘笔记评价如何,好用吗

KDATA垃圾硬盘, 用不到1年就坏, 坏了给换新, 结果用了不到半年,自己起火了正常使用中,突然起火, 把我电源接头都烧烂了, 我电脑挂了3块硬盘,就他的着火了, 找客服, 一开始还很有抱歉的意思,说给我换, 我也就不追究其他的了, 结果后来换了个客户给我说只能修, 再后来, 直接说修不了, 还说是我的问题造成的硬盘也不要了, 就想让各位看观看看这硬盘的质量, 到底多垃圾!!

Kdata/金田 S3-128GB固态硬盘怎么样,性能如何

买过两个,都会出现丢数据的问题主要体现在安装了启动盘过段时间后不能用复制进去的安装包,过段时间用就会报错。我都不敢往里放东西了,尤其是工作文件等

KDATA(金田)U盘真的是SLC芯片吗?

芯片不知道 你查1下是不是JS29F16B08HCND1 这个型号的 若是的话配银灿主控902的应该,不能CDROM貌似 ,用主控902E的应该可以 读写150-200

请问vb代码 S = kData(20) * 256& + cData(21) 是什么意思,特别是“* 256&”不明白!

S 和 kData、cData是什么类型的? 应该字符串吧?&是字符串连接符,至于那个*256.。。不好说啊,移位吗= =

KDATA这个牌子的固态硬盘怎么样

您好 2246EN的主控,南亚256M缓存,海力士的MLC颗粒(据说没有东芝的好),电路部分基本没有缩水,但工艺实在是...手工一般啊 建议您购买 三星或者是闪迪的, 望采纳 谢谢

kdata金田 固态硬盘怎么样

  规格参数编辑  主体  品牌KDATA  系列USB3.0  型号KF31-128G  颜色黑色  外壳材质金属  规格  容量128G  指示灯有  尺寸8.5mm高*18mm宽*69mm长  重量13g  特性  独特设计金属运动款,透明USB盖帽可连接尾部,防丢!  包装清单编辑  KDATA KF31-128G × 1 塑料包装 × 1 挂绳 × 1

kdata金田的u盘怎么样

好。kdata金田的u盘是一款读写速度非常快的u盘,所以好。u盘是通过USB接口接入的储存盘,是闪存的一种,可以做到便捷式实现即插即用。

kdata是什么牌子

是深圳金田科技。深圳金田科技设于中国硅谷、经济特区--深圳市,是集科研、设计、生产、营销于一体的高科技企业。现公司拥有一支高素质、高效率的员工团队,五大事业部(品牌运营部、外贸出口部、OEM销售部、生产部、工程研发部)。公司创业以来,一直奉行:以优良品质创信誉,持续改进促发展的经营理念,坚持不断创新,满足客户要求。弘扬:团结和谐、努力进取、不断更新自我的企业文化。力求成为行业楷模,达到营销、管理、技术、产品、服务五样最好。重视人才、品牌、形象,三大经营发展战略,全面提升企业整体素质与管理水平,与时俱进,增强企业竞争力,永做第一。企业产品适用打印材质金田平板彩印机可将高精度的图像打印在物体材质的表面,适用的材质如:亚克力、木竹材料、石材、金属、皮革、水晶玻璃类、瓷质、各种塑料(ABS、PC、PE、PP、PU)铜版纸、瓷砖/瓷器等产品以及各种成份复杂的有机物体上。适用打印产品U盘、MP3/MP4、移动硬盘、手机壳、亚克力制品、光盘、礼品、玩具、文具、家私、瓷器、广告牌、玻璃移门。

linkdata.idx正由另一进程进行,该进程无法访问怎么解决

进入任务管理器 先把要访问的进程关闭,就可以了

真三国无双5中linkdata.idx的作用是什么?

里面有各个武将的技能,攻击动作,贴图等

请问如何修改 无双大蛇里的LINKDATA_BNS.IDX这个动作索引文件。

用16进制编辑软件比如UE,WINHEX等可以打开并编辑IDX文件(推荐后者)修改器功能没那么完善,有些东西是要用这些编辑软件才能改的IDX文件是BIN文件的索引,实际内容在BIN文件里这段话是引用绿燕的吧,如果你真的想修改游戏的话最好把教程认真看一下还有就是不要忘了备份

关于无 双大蛇Z 找不到LINKDATA_BNS文件问题

你是不是搞错了啊,无双大蛇Z本来就没有LINKDATA_BNS这个文件,LINKDATA_BNS这个文件是无双大蛇(注意没有Z字)的,而且你说的改图是指人物MOD之类的么?无双大蛇Z用的是LINKDATA1.BIN来修改的,你用的修改器是用来修改无双大蛇的吧,不能用在Z上面。自己去网上查查用来修改Z的凉宫修改器版本吧。

关于-无双大蛇Z-DATA文件夹下没有LINKDATA_BNS文件!

我只知道我的无双大蛇Z的DATA文件夹下没有LINKDATA_BNS,无双大蛇Z文件夹下有LINKDATA1(bin文件1.52G)LINKDATA1.IDX(75K),LINKDATA2(bin文件1.71G),LINKDATA2.IDX(13k)LINKDATA3(bin文件1.26G),LINKDATA3.IDX(15.4K)