package org.apache.arrow.flight;

import com.google.common.base.Preconditions;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.auth.ServerAuthWrapper;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/arrow/flight/FlightService.class */
public class FlightService extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) FlightService.class);
    private static final int PENDING_REQUESTS = 5;
    private final BufferAllocator allocator;
    private final FlightProducer producer;
    private final ServerAuthHandler authHandler;
    private final ExecutorService executors = Executors.newCachedThreadPool();

    /* loaded from: input_file:org/apache/arrow/flight/FlightService$GetListener.class */
    private static class GetListener implements FlightProducer.ServerStreamListener {
        private ServerCallStreamObserver<ArrowMessage> responseObserver;
        private volatile VectorUnloader unloader;

        public GetListener(StreamObserver<ArrowMessage> streamObserver) {
            this.responseObserver = (ServerCallStreamObserver) streamObserver;
            this.responseObserver.setOnCancelHandler(() -> {
                onCancel();
            });
            this.responseObserver.disableAutoInboundFlowControl();
        }

        private void onCancel() {
            FlightService.logger.debug("Stream cancelled by client.");
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public boolean isReady() {
            return this.responseObserver.isReady();
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public boolean isCancelled() {
            return this.responseObserver.isCancelled();
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void start(VectorSchemaRoot vectorSchemaRoot) {
            this.responseObserver.onNext(new ArrowMessage(null, vectorSchemaRoot.getSchema()));
            this.unloader = new VectorUnloader(vectorSchemaRoot, true, false);
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void putNext() {
            Preconditions.checkNotNull(this.unloader);
            this.responseObserver.onNext(new ArrowMessage(this.unloader.getRecordBatch()));
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void error(Throwable th) {
            this.responseObserver.onError(th);
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void completed() {
            this.responseObserver.onCompleted();
        }
    }

    public FlightService(BufferAllocator bufferAllocator, FlightProducer flightProducer, ServerAuthHandler serverAuthHandler) {
        this.allocator = bufferAllocator;
        this.producer = flightProducer;
        this.authHandler = serverAuthHandler;
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> streamObserver) {
        return ServerAuthWrapper.wrapHandshake(this.authHandler, streamObserver, this.executors);
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightGetInfo> streamObserver) {
        try {
            this.producer.listFlights(new Criteria(criteria), StreamPipe.wrap(streamObserver, flightInfo -> {
                return flightInfo.toProtocol();
            }));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> streamObserver) {
        try {
            this.producer.getStream(new Ticket(ticket), new GetListener(streamObserver));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void doAction(Flight.Action action, StreamObserver<Flight.Result> streamObserver) {
        try {
            streamObserver.onNext(this.producer.doAction(new Action(action)).toProtocol());
            streamObserver.onCompleted();
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void listActions(Flight.Empty empty, StreamObserver<Flight.ActionType> streamObserver) {
        try {
            this.producer.listActions(StreamPipe.wrap(streamObserver, actionType -> {
                return actionType.toProtocol();
            }));
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }

    public StreamObserver<ArrowMessage> doPutCustom(StreamObserver<Flight.PutResult> streamObserver) {
        ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) streamObserver;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        serverCallStreamObserver.request(1);
        FlightStream flightStream = new FlightStream(this.allocator, 5, null, i -> {
            serverCallStreamObserver.request(i);
        });
        this.executors.submit(() -> {
            try {
                serverCallStreamObserver.onNext(this.producer.acceptPut(flightStream).call());
                serverCallStreamObserver.onCompleted();
            } catch (Exception e) {
                serverCallStreamObserver.onError(e);
            }
        });
        return flightStream.asObserver();
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase
    public void getFlightInfo(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.FlightGetInfo> streamObserver) {
        try {
            streamObserver.onNext(this.producer.getFlightInfo(new FlightDescriptor(flightDescriptor)).toProtocol());
            streamObserver.onCompleted();
        } catch (Exception e) {
            streamObserver.onError(e);
        }
    }
}
