Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flight/flight-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ under the License.
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
Expand Down Expand Up @@ -145,6 +146,12 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.20.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
1 change: 0 additions & 1 deletion flight/flight-core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
requires com.google.protobuf;
requires com.google.protobuf.util;
requires io.grpc;
requires io.grpc.internal;
requires io.grpc.netty;
requires io.grpc.protobuf;
requires io.grpc.stub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import io.grpc.Drainable;
Expand All @@ -38,10 +36,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.arrow.flight.FlightDataParser.ArrowBufReader;
import org.apache.arrow.flight.FlightDataParser.FlightDataReader;
import org.apache.arrow.flight.FlightDataParser.InputStreamReader;
import org.apache.arrow.flight.grpc.AddWritableBuffer;
import org.apache.arrow.flight.grpc.GetReadableBuffer;
import org.apache.arrow.flight.impl.Flight.FlightData;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.apache.arrow.memory.ArrowBuf;
Expand All @@ -55,10 +54,14 @@
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.MetadataVersion;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The in-memory representation of FlightData used to manage a stream of Arrow messages. */
class ArrowMessage implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ArrowMessage.class);

// If true, deserialize Arrow data by giving Arrow a reference to the underlying gRPC buffer
// instead of copying the data. Defaults to true.
public static final boolean ENABLE_ZERO_COPY_READ;
Expand All @@ -75,19 +78,10 @@ class ArrowMessage implements AutoCloseable {
if (zeroCopyWriteFlag == null) {
zeroCopyWriteFlag = System.getenv("ARROW_FLIGHT_ENABLE_ZERO_COPY_WRITE");
}
ENABLE_ZERO_COPY_READ = !"false".equalsIgnoreCase(zeroCopyReadFlag);
ENABLE_ZERO_COPY_READ = true; // !"false".equalsIgnoreCase(zeroCopyReadFlag);
ENABLE_ZERO_COPY_WRITE = "true".equalsIgnoreCase(zeroCopyWriteFlag);
}

private static final int DESCRIPTOR_TAG =
(FlightData.FLIGHT_DESCRIPTOR_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
private static final int BODY_TAG =
(FlightData.DATA_BODY_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
private static final int HEADER_TAG =
(FlightData.DATA_HEADER_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
private static final int APP_METADATA_TAG =
(FlightData.APP_METADATA_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;

private static final Marshaller<FlightData> NO_BODY_MARSHALLER =
ProtoUtils.marshaller(FlightData.getDefaultInstance());

Expand Down Expand Up @@ -143,6 +137,8 @@ public static HeaderType getHeader(byte b) {
private final ArrowBuf appMetadata;
private final List<ArrowBuf> bufs;
private final boolean tryZeroCopyWrite;
// For zero-copy reads, this releases the message-scoped allocator after local buffers close.
private final BufferAllocator messageAllocator;

public ArrowMessage(FlightDescriptor descriptor, Schema schema, IpcOption option) {
this.writeOption = option;
Expand All @@ -153,6 +149,7 @@ public ArrowMessage(FlightDescriptor descriptor, Schema schema, IpcOption option
this.descriptor = descriptor;
this.appMetadata = null;
this.tryZeroCopyWrite = false;
this.messageAllocator = null;
}

/**
Expand All @@ -172,6 +169,7 @@ public ArrowMessage(
this.descriptor = null;
this.appMetadata = appMetadata;
this.tryZeroCopyWrite = tryZeroCopy;
this.messageAllocator = null;
}

public ArrowMessage(ArrowDictionaryBatch batch, IpcOption option) {
Expand All @@ -185,6 +183,7 @@ public ArrowMessage(ArrowDictionaryBatch batch, IpcOption option) {
this.descriptor = null;
this.appMetadata = null;
this.tryZeroCopyWrite = false;
this.messageAllocator = null;
}

/**
Expand All @@ -200,6 +199,7 @@ public ArrowMessage(ArrowBuf appMetadata) {
this.descriptor = null;
this.appMetadata = appMetadata;
this.tryZeroCopyWrite = false;
this.messageAllocator = null;
}

public ArrowMessage(FlightDescriptor descriptor) {
Expand All @@ -210,13 +210,23 @@ public ArrowMessage(FlightDescriptor descriptor) {
this.descriptor = descriptor;
this.appMetadata = null;
this.tryZeroCopyWrite = false;
this.messageAllocator = null;
}

private ArrowMessage(
ArrowMessage(
FlightDescriptor descriptor,
MessageMetadataResult message,
ArrowBuf appMetadata,
ArrowBuf buf) {
this(descriptor, message, appMetadata, buf, null);
}

ArrowMessage(
FlightDescriptor descriptor,
MessageMetadataResult message,
ArrowBuf appMetadata,
ArrowBuf buf,
BufferAllocator messageAllocator) {
// No need to take IpcOption as this is used for deserialized ArrowMessage coming from the wire.
this.writeOption =
message != null
Expand All @@ -229,6 +239,7 @@ private ArrowMessage(
this.appMetadata = appMetadata;
this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf);
this.tryZeroCopyWrite = false;
this.messageAllocator = messageAllocator;
}

public MessageMetadataResult asSchemaMessage() {
Expand Down Expand Up @@ -269,6 +280,7 @@ public ArrowDictionaryBatch asDictionaryBatch() throws IOException {
bufs.size() == 1, "A batch can only be consumed if it contains a single ArrowBuf.");
Preconditions.checkArgument(getMessageType() == HeaderType.DICTIONARY_BATCH);
ArrowBuf underlying = bufs.get(0);

// Retain a reference to keep the batch alive when the message is closed
underlying.getReferenceManager().retain();
// Do not set drained - we still want to release our reference
Expand All @@ -280,101 +292,16 @@ public Iterable<ArrowBuf> getBufs() {
}

private static ArrowMessage frame(BufferAllocator allocator, final InputStream stream) {

try {
FlightDescriptor descriptor = null;
MessageMetadataResult header = null;
ArrowBuf body = null;
ArrowBuf appMetadata = null;
while (stream.available() > 0) {
final int tagFirstByte = stream.read();
if (tagFirstByte == -1) {
break;
}
int tag = readRawVarint32(tagFirstByte, stream);
switch (tag) {
case DESCRIPTOR_TAG:
{
int size = readRawVarint32(stream);
byte[] bytes = new byte[size];
ByteStreams.readFully(stream, bytes);
descriptor = FlightDescriptor.parseFrom(bytes);
break;
}
case HEADER_TAG:
{
int size = readRawVarint32(stream);
byte[] bytes = new byte[size];
ByteStreams.readFully(stream, bytes);
header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
break;
}
case APP_METADATA_TAG:
{
int size = readRawVarint32(stream);
appMetadata = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, appMetadata, size, ENABLE_ZERO_COPY_READ);
break;
}
case BODY_TAG:
if (body != null) {
// only read last body.
body.getReferenceManager().release();
body = null;
}
int size = readRawVarint32(stream);
body = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, body, size, ENABLE_ZERO_COPY_READ);
break;

default:
// ignore unknown fields.
}
}
// Protobuf implementations can omit empty fields, such as body; for some message types, like
// RecordBatch,
// this will fail later as we still expect an empty buffer. In those cases only, fill in an
// empty buffer here -
// in other cases, like Schema, having an unexpected empty buffer will also cause failures.
// We don't fill in defaults for fields like header, for which there is no reasonable default,
// or for appMetadata
// or descriptor, which are intended to be empty in some cases.
if (header != null) {
switch (HeaderType.getHeader(header.headerType())) {
case SCHEMA:
// Ignore 0-length buffers in case a Protobuf implementation wrote it out
if (body != null && body.capacity() == 0) {
body.close();
body = null;
}
break;
case DICTIONARY_BATCH:
case RECORD_BATCH:
// A Protobuf implementation can skip 0-length bodies, so ensure we fill it in here
if (body == null) {
body = allocator.getEmpty();
}
break;
case NONE:
case TENSOR:
default:
// Do nothing
break;
}
FlightDataReader reader;
if (ENABLE_ZERO_COPY_READ) {
reader = ArrowBufReader.tryArrowBufReader(allocator, stream);
if (reader != null) {
return reader.toMessage();
}
return new ArrowMessage(descriptor, header, appMetadata, body);
} catch (Exception ioe) {
throw new RuntimeException(ioe);
}
}

private static int readRawVarint32(InputStream is) throws IOException {
int firstByte = is.read();
return readRawVarint32(firstByte, is);
}

private static int readRawVarint32(int firstByte, InputStream is) throws IOException {
return CodedInputStream.readRawVarint32(firstByte, is);
reader = new InputStreamReader(allocator, stream);
return reader.toMessage();
}

/**
Expand Down Expand Up @@ -586,6 +513,6 @@ public ArrowMessage parse(InputStream stream) {

@Override
public void close() throws Exception {
AutoCloseables.close(Iterables.concat(bufs, Collections.singletonList(appMetadata)));
AutoCloseables.close(Iterables.concat(bufs, AutoCloseables.iter(appMetadata, messageAllocator)));
}
}
Loading