跳到主要内容

Row format 指南

Row format protocol

Java

public class Bar {
String f1;
List<Long> f2;
}

public class Foo {
int f1;
List<Integer> f2;
Map<String, Integer> f3;
List<Bar> f4;
}

RowEncoder<Foo> encoder = Encoders.bean(Foo.class);
Foo foo = new Foo();
foo.f1 = 10;
foo.f2 = IntStream.range(0, 1000000).boxed().collect(Collectors.toList());
foo.f3 = IntStream.range(0, 1000000).boxed().collect(Collectors.toMap(i -> "k"+i, i->i));
List<Bar> bars = new ArrayList<>(1000000);
for (int i = 0; i < 1000000; i++) {
Bar bar = new Bar();
bar.f1 = "s"+i;
bar.f2 = LongStream.range(0, 10).boxed().collect(Collectors.toList());
bars.add(bar);
}
foo.f4 = bars;
// Can be zero-copy read by python
BinaryRow binaryRow = encoder.toRow(foo);
// can be data from python
Foo newFoo = encoder.fromRow(binaryRow);
// zero-copy read List<Integer> f2
BinaryArray binaryArray2 = binaryRow.getArray(1);
// zero-copy read List<Bar> f4
BinaryArray binaryArray4 = binaryRow.getArray(3);
// zero-copy read 11th element of `readList<Bar> f4`
BinaryRow barStruct = binaryArray4.getStruct(10);

// zero-copy read 6th of f2 of 11th element of `readList<Bar> f4`
barStruct.getArray(1).getInt64(5);
RowEncoder<Bar> barEncoder = Encoders.bean(Bar.class);
// deserialize part of data.
Bar newBar = barEncoder.fromRow(barStruct);
Bar newBar2 = barEncoder.fromRow(binaryArray4.getStruct(20));

Python

@dataclass
class Bar:
f1: str
f2: List[pa.int64]
@dataclass
class Foo:
f1: pa.int32
f2: List[pa.int32]
f3: Dict[str, pa.int32]
f4: List[Bar]

encoder = pyfury.encoder(Foo)
foo = Foo(f1=10, f2=list(range(1000_000)),
f3={f"k{i}": i for i in range(1000_000)},
f4=[Bar(f1=f"s{i}", f2=list(range(10))) for i in range(1000_000)])
binary: bytes = encoder.to_row(foo).to_bytes()
print(f"start: {datetime.datetime.now()}")
foo_row = pyfury.RowData(encoder.schema, binary)
print(foo_row.f2[100000], foo_row.f4[100000].f1, foo_row.f4[200000].f2[5])
print(f"end: {datetime.datetime.now()}")

binary = pickle.dumps(foo)
print(f"pickle start: {datetime.datetime.now()}")
new_foo = pickle.loads(binary)
print(new_foo.f2[100000], new_foo.f4[100000].f1, new_foo.f4[200000].f2[5])
print(f"pickle end: {datetime.datetime.now()}")

Apache Arrow 支持

Apache Fury Format 还支持从 Arrow Table/RecordBatch 自动转换。

Java:

Schema schema = TypeInference.inferSchema(BeanA.class);
ArrowWriter arrowWriter = ArrowUtils.createArrowWriter(schema);
Encoder<BeanA> encoder = Encoders.rowEncoder(BeanA.class);
for (int i = 0; i < 10; i++) {
BeanA beanA = BeanA.createBeanA(2);
arrowWriter.write(encoder.toRow(beanA));
}
return arrowWriter.finishAsRecordBatch();

Python:

import pyfury
encoder = pyfury.encoder(Foo)
encoder.to_arrow_record_batch([foo] * 10000)
encoder.to_arrow_table([foo] * 10000)

C++:

std::shared_ptr<ArrowWriter> arrow_writer;
EXPECT_TRUE(
ArrowWriter::Make(schema, ::arrow::default_memory_pool(), &arrow_writer)
.ok());
for (auto &row : rows) {
EXPECT_TRUE(arrow_writer->Write(row).ok());
}
std::shared_ptr<::arrow::RecordBatch> record_batch;
EXPECT_TRUE(arrow_writer->Finish(&record_batch).ok());
EXPECT_TRUE(record_batch->Validate().ok());
EXPECT_EQ(record_batch->num_columns(), schema->num_fields());
EXPECT_EQ(record_batch->num_rows(), row_nums);
Schema schema = TypeInference.inferSchema(BeanA.class);
ArrowWriter arrowWriter = ArrowUtils.createArrowWriter(schema);
Encoder<BeanA> encoder = Encoders.rowEncoder(BeanA.class);
for (int i = 0; i < 10; i++) {
BeanA beanA = BeanA.createBeanA(2);
arrowWriter.write(encoder.toRow(beanA));
}
return arrowWriter.finishAsRecordBatch();