sábado, 28 de junio de 2025

Subir un archivo CSV a una tabla en GCP BigQuery

En una entrega pasada vimos como cargar datos desde un CSV a una tabla en BigQuery.

Esta vez crearemos un programa con Java y BigQuery. Para ello necesitamos tener un archivo con datos el cual llamaremos "DATOS.txt". El contenido del archivo será similar a esto:

20250412,34,450.0
20250412,34,432.0
20250412,122,500.0

Tendremos una tabla llamada ``tkdata`` con los siguientes campos:

fchinf DATE 
numregs INT64 
valor STRING 

Requisitos:

  • Tener acceso a GCP BigQuery.
  • Tener JDK 11 o más actual. 
  • Tener Maven (el más actual).

¿Qué hará el programa?

  1. Verificar si existe el archivo a subir ("DATOS.txt"). 
  2. Obtener su contenido y guardarlo en una lista tipo String. 
  3. Crear un objeto tipo StringBuilder a partir de la lista tipo String. 
  4. Enviar el contenido del objeto StringBuilder a un nuevo archivo ("tkdata.csv") que se guardará en el mismo bucket del archivo original. 
  5. Cargar el contenido del nuevo archivo a la tabla ``tkdata``.
  6. Verificar que los datos hayan sido cargados a la tabla.

BigQueryCsvUploader.java

import com.google.cloud.bigquery.*;
import com.google.cloud.storage.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class BigQueryCsvUploader {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryCsvUploader.class);
    private static final String NAME_FILE_ORIGINAL = "DATOS.csv";
    private static final String TABLE_NAME = "tkdata";
    private static final String DATASET = "mydataset";
    private static final String PROJECT = "myproject";
    private static final String BUCKET = "mybucket";
    private static final String NAME_NEW_FILE = "tkdata.csv";
    private static final long MAX_SIZE_BYTES = 300 * 1024 * 1024; // 300 MB

   
    public static class Tkdata {
        private Date fchinf;
        private long numregs;
        private String valor;

        public Date getFchinf() { return fchinf; }
        public void setFchinf(String fchinf) throws ParseException {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            this.fchinf = sdf.parse(fchinf);
        }
        public long getNumregs() { return numregs; }
        public void setNumregs(String numregs) { this.numregs = Long.parseLong(numregs); }
        public String getValor() { return valor; }
        public void setValor(String valor) { this.valor = valor; }
    }

   
    public static boolean existFile(String bucketName, String fileName) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Blob blob = storage.get(BlobId.of(bucketName, fileName));
        return blob != null && blob.exists();
    }

   
    public static boolean maxSize(String bucketName, 
     String fileName, long maxSizeBytes) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Blob blob = storage.get(BlobId.of(bucketName, fileName));
        if (blob == null) {
            LOG.error("El archivo {} no existe en el bucket {}", fileName, bucketName);
            return false;
        }
        return blob.getSize() <= maxSizeBytes;
    }

    
    public static List<Tkdata> getListTkdata(String bucketName, String fileName) {
        List<Tkdata> listaTkdata = new ArrayList<>();
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Blob blob = storage.get(BlobId.of(bucketName, fileName));
        if (blob == null) {
            LOG.error("Archivo{} no encontrado en el bucket {}", fileName, bucketName);
            return listaTkdata;
        }

        String content = new String(blob.getContent(), StandardCharsets.UTF_8);
        try (BufferedReader reader = new BufferedReader(new StringReader(content))) {
            String line;
            reader.readLine();
            while ((line = reader.readLine()) != null) {
                String[] parts = line.split(",", -1); 
                if (parts.length == 3) {
                    Tkdata obj = new Tkdata();
                    try {
                        obj.setFchinf(parts[0].trim());
                        obj.setNumregs(parts[1].trim());
                        obj.setValor(parts[2].trim());
                        listaTkdata.add(obj);
                    } catch (ParseException | NumberFormatException e) {
                        LOG.error("Error parseando linea: {}", line, e);
                    }
                } else {
                    LOG.warn("Linea malformada: {}", line);
                }
            }
        } catch (IOException e) {
            LOG.error("Error al leer el contenido", e);
        }
        return listaTkdata;
    }

    
    public static StringBuilder convertToStringBuilder(List<Tkdata> listaTkdata) {
        StringBuilder sb = new StringBuilder();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        for (Tkdata item : listaTkdata) {
            sb.append(sdf.format(item.getFchinf())).append(",");
            sb.append(item.getNumregs()).append(",");
            sb.append(item.getValor()).append("\n");
        }
        return sb;
    }

   
    public static boolean creaNuevoFileCSV(String bucketName, 
       String fileName, String content) {
        try {
            Storage storage = StorageOptions.getDefaultInstance().getService();
            BlobId blobId = BlobId.of(bucketName, fileName);
            BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/csv").build();
            storage.create(blobInfo, content.getBytes(StandardCharsets.UTF_8));
            return true;
        } catch (Exception e) {
            LOG.error("Error al subir archivo {} al bucket {}", fileName, bucketName, e);
            return false;
        }
    }

    
    public static boolean loadCSVToTableBigQuery(String projectId, 
     String datasetId, String bucketName, String sourceFileName, String tableName) {
        try {
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            TableId tableId = TableId.of(projectId, datasetId, tableName);

            
            Schema schema = Schema.of(
                Field.of("fchinf", StandardSQLTypeName.DATE),
                Field.of("numregs", StandardSQLTypeName.INT64),
                Field.of("valor", StandardSQLTypeName.STRING)
            );

           
            JobConfiguration jobConfig = LoadJobConfiguration.newBuilder(
                tableId,
                String.format("gs://%s/%s", bucketName, sourceFileName),
                FormatOptions.csv()
            )
                .setSchema(schema)
                .setSkipLeadingRows(0) 
                .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) 
                .build();

            
            Job job = bigquery.create(JobInfo.of(jobConfig));
            job = job.waitFor();
            if (job.isDone() && job.getStatus().getError() == null) {
                LOG.info("CSV {} successfully loaded into BigQuery table {}.{}", 
                 sourceFileName, datasetId, tableName);
                return true;
            } else {
                LOG.error("Error cargando CSV a BigQuery: {}", job.getStatus().getError());
                return false;
            }
        } catch (Exception e) {
            LOG.error("Error en el proceso de carga", e);
            return false;
        }
    }

  
    public static boolean validateTableData(String projectId, 
      String datasetId, String tableName, List<Tkdata> expectedData) {
        try {
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            String query = String.format("SELECT fchinf, numregs, valor FROM %s.%s.%s", 
                        projectId, datasetId, tableName);
            QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

           
            TableResult result = bigquery.query(queryConfig);
            long rowCount = result.getTotalRows();

            if (rowCount == 0) {
                LOG.error("No hay datos en la tabla {}.{}", datasetId, tableName);
                return false;
            }

           
            if (expectedData != null && rowCount != expectedData.size()) {
                LOG.warn("Error en conteo de datos {}, found {}", expectedData.size(), rowCount);
                return false;
            }

            
            LOG.info("Sample data from {}.{} (first 5 rows):", datasetId, tableName);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            int maxRowsToLog = 5;
            int rowIndex = 0;
            for (FieldValueList row : result.iterateAll()) {
                if (rowIndex >= maxRowsToLog) break;
                String fchinf = row.get("fchinf").getStringValue(); 
                long numregs = row.get("numregs").getLongValue();
                String valor = row.get("valor").getStringValue();
                LOG.info("Row {}: fchinf={}, numregs={}, valor={}", rowIndex + 1, fchinf, numregs, valor);
                rowIndex++;
            }

            LOG.info("Validacion exitosa: {} rows found in table {}.{}", 
          rowCount, datasetId, tableName);
            return true;
        } catch (Exception e) {
            LOG.error("Error validando datos en la tabla {}.{}", datasetId, tableName, e);
            return false;
        }
    }

    public static void main(String[] args) {
     
        if (!existFile(BUCKET, NAME_FILE_ORIGINAL)) {
            LOG.error("Archivo{} no existe en el buckett {}.", NAME_FILE_ORIGINAL, BUCKET);
            return;
        }

        
        if (!maxSize(BUCKET, NAME_FILE_ORIGINAL, MAX_SIZE_BYTES)) {
            LOG.error("El archivo {} excede el tamaño: 300MB.", NAME_FILE_ORIGINAL);
            return;
        }

        
        List<Tkdata> listaTkdata = getListTkdata(BUCKET, NAME_FILE_ORIGINAL);
        if (listaTkdata.isEmpty()) {
            LOG.error("No hay datos para leer {}. ", NAME_FILE_ORIGINAL);
            return;
        }

        StringBuilder sb = convertToStringBuilder(listaTkdata);

        
        if (creaNuevoFileCSV(BUCKET, NAME_NEW_FILE, sb.toString())) {
            LOG.info("Archivo {} cargado al bucket {}", NAME_NEW_FILE, BUCKET);
            LOG.info("Carga de datos {} a la tabla: {}.{}", NAME_NEW_FILE, DATASET, TABLE_NAME);
            if (loadCSVToTableBigQuery(PROJECT, DATASET, BUCKET, NAME_NEW_FILE, TABLE_NAME)) {
                if (validateTableData(PROJECT, DATASET, TABLE_NAME, listaTkdata)) {
                    LOG.info("Validacion correcta {}.{}", DATASET, TABLE_NAME);
                } else {
                    LOG.error("Validacion fallida {}.{}", DATASET, TABLE_NAME);
                }
            } else {
                LOG.error("Fallo al cargar los datos a la tabla de BigQuery  {}.{}", DATASET, TABLE_NAME);
            }
        } else {
            LOG.error("Fallo al cargar el archivo {} al bucket {}", NAME_NEW_FILE, BUCKET);
        }
    }
}

Es importante hacer notar que los tipos de datos en el archivo deben concordar a los datos de la tabla. En caso contrario, no cargarán.

También es importante tener estas dependencias en el pom.xml

<dependencies>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-storage</artifactId>
        <version>2.44.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-bigquery</artifactId>
        <version>2.44.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.13</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.13</version>
    </dependency>
</dependencies>

De preferncia usar Eclipse IDE para generar un .jar o ejecutarlo directamente.

Si la ejecución es correcta podremos ver los datos cargados en la tabla. Continuaremos más sobre BigQuery en próximas entregas.

Enlaces:

https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv

No hay comentarios:

Publicar un comentario

Kubernetes en un vistazo

Continuamos con esta serie de entregas sobre Docker y Kubernetes Según la documentación oficial Kubernetes (k8s) es una plataforma de...

Etiquetas

Archivo del blog