domingo, 15 de diciembre de 2024

GCP BigQuery: cargar un CSV a una tabla y crear un respaldo

GCP BigQuery nos permite cargar datos de un fichero (con formato CSV, TXT, etc.) a tablas en nuestros datasets.

En ésta ocasión veremos como validar la existencia de un archivo CSV en un bucket (contenedor), cargarlo a una tabla temporal e insertarlo a una determinada tabla. Además veremos cómo crear un respaldo de ese archivo CSV a otro bucket.

BigQueryTransfer.java

import com.google.cloud.bigquery.*;
import com.google.cloud.storage.*;
import com.google.cloud.bigquerystorage.v1.*;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import java.io.*;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;

public class BigQueryTransfer {

    private static final String BUCKET_NAME = "your-bucket-name";
    private static final String BACKUP_BUCKET_NAME = "your-backup-bucket-name";
    private static final String DATASET_NAME = "mydataset";
    private static final String TABLE_NAME = "transferencia";
    private static final String PROJECT_ID = "myproject";
    private static final String CSV_FILE_NAME = "datos.csv";
    private static final String RESULT_CSV_NAME = "resultados.csv";

    public static void main(String[] args) throws Exception {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();

        // 1. Verificar la existencia del archivo CSV
        Blob blob = storage.get(BUCKET_NAME, CSV_FILE_NAME);
        if (blob == null) {
            System.out.println("El archivo " + CSV_FILE_NAME + " no existe en el bucket.");
            return;
        }

        // 2. Leer el contenido del archivo CSV
        List<String> listaContenido = new ArrayList<>();
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(Channels.newInputStream(blob.reader())))) {
            String line;
            while ((line = reader.readLine()) != null) {
                listaContenido.add(line);
            }
        }

        // 3. Crear la lista de Transferencia
        List<Transferencia> listaTransferencia = new ArrayList<>();
        for (String line : listaContenido) {
            String[] parts = line.split(",");
            if (parts.length >= 3) {
                Transferencia transferencia = new Transferencia(parts[0], parts[1], parts[2]);
                listaTransferencia.add(transferencia);
            }
        }

        // 4. Guardar en BigQuery usando BigQuery Storage Write API
        writeToBigQueryStorageApi(listaTransferencia);

        // 5. Crear "resultados.csv" con datos de la tabla
        String query = "SELECT * FROM `" + PROJECT_ID + "." + DATASET_NAME + "." + TABLE_NAME + "`";
        TableResult result = bigQuery.query(QueryJobConfiguration.newBuilder(query).build());

        File resultFile = new File(RESULT_CSV_NAME);
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resultFile))) {
            for (FieldValueList row : result.iterateAll()) {
                writer.write(String.join(",",
                        row.get(0).getStringValue(),
                        row.get(1).getStringValue(),
                        row.get(2).getStringValue()));
                writer.newLine();
            }
        }

        // Subir "resultados.csv" al bucket
        BlobId resultBlobId = BlobId.of(BUCKET_NAME, RESULT_CSV_NAME);
        BlobInfo resultBlobInfo = BlobInfo.newBuilder(resultBlobId).build();
        storage.create(resultBlobInfo, Files.readAllBytes(resultFile.toPath()));

        // 6. Mover "datos.csv" al bucket de respaldo
        BlobId sourceBlobId = BlobId.of(BUCKET_NAME, CSV_FILE_NAME);
        BlobId backupBlobId = BlobId.of(BACKUP_BUCKET_NAME, CSV_FILE_NAME);
        storage.copy(Storage.CopyRequest.of(sourceBlobId, backupBlobId));
        storage.delete(sourceBlobId);

        System.out.println("Proceso completado con éxito.");
    }

    private static void writeToBigQueryStorageApi(List<Transferencia> listaTransferencia) throws Exception {
        try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
            String tablePath = String.format("projects/%s/datasets/%s/tables/%s", PROJECT_ID, DATASET_NAME, TABLE_NAME);

            WriteStream writeStream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
            WriteStream createdStream = client.createWriteStream(CreateWriteStreamRequest.newBuilder().setParent(tablePath).setWriteStream(writeStream).build());

            ProtoSchema protoSchema = client.getWriteStream(GetWriteStreamRequest.newBuilder().setName(createdStream.getName()).build()).getTableSchema();
            Descriptors.Descriptor descriptor = TableSchemaToDescriptor.parse(protoSchema.getProtoDescriptor());

            for (Transferencia transferencia : listaTransferencia) {
                Message.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
                messageBuilder.setField(descriptor.findFieldByName("fecha"), transferencia.fecha);
                messageBuilder.setField(descriptor.findFieldByName("clave"), transferencia.clave);
                messageBuilder.setField(descriptor.findFieldByName("cuenta"), transferencia.cuenta);

                client.appendRows(AppendRowsRequest.newBuilder()
                        .setWriteStream(createdStream.getName())
                        .addRows(ByteString.copyFrom(messageBuilder.build().toByteArray()))
                        .build());
            }

            client.finalizeWriteStream(FinalizeWriteStreamRequest.newBuilder().setName(createdStream.getName()).build());
        }
    }

    static class Transferencia {
        String fecha;
        String clave;
        String cuenta;

        Transferencia(String fecha, String clave, String cuenta) {
            this.fecha = fecha;
            this.clave = clave;
            this.cuenta = cuenta;
        }
    }
}

Con este programa:

  1. Verificamos la existencia del archivo "datos.csv". 
  2. Leemos su contenido y lo almacenamos en una lista tipo String. 
  3. Cargamos el contenido del archivo "datos.csv" a una tabla temporal. 
  4. Creamos una lista de tipo "Transferencia". 
  5. Guardamos los datos en la tabla "transferencia" usando BigQuery Storage Write API. 
  6. Creamos un archivo en el bucket llamado "resultados.csv" a partir de una consulta a la tabla "transferencia". 
  7. Creamos un respaldo de "datos.csv" a otro bucket.

Enlaces:

https://cloud.google.com/storage/docs/buckets
https://medium.com/@bravnic/bigquery-storage-write-api-at-scale-7affcc2d7a93
https://cloud.google.com/bigquery/docs/write-api
https://www.googlecloudcommunity.com/gc/Data-Analytics/Using-Google-Bigquery-Storage-Write-API-with-high-concurrency/m-p/662308

No hay comentarios:

Publicar un comentario