sábado, 28 de junio de 2025

Subir un archivo CSV a una tabla en GCP BigQuery

En una entrega pasada vimos como cargar datos desde un CSV a una tabla en BigQuery.

Esta vez crearemos un programa con Java y BigQuery. Para ello necesitamos tener un archivo con datos el cual llamaremos "DATOS.txt". El contenido del archivo será similar a esto:

20250412,34,450.0
20250412,34,432.0
20250412,122,500.0

Tendremos una tabla llamada ``tkdata`` con los siguientes campos:

fchinf DATE 
numregs INT64 
valor STRING 

Requisitos:

  • Tener acceso a GCP BigQuery.
  • Tener JDK 11 o más actual. 
  • Tener Maven (el más actual).

¿Qué hará el programa?

  1. Verificar si existe el archivo a subir ("DATOS.txt"). 
  2. Obtener su contenido y guardarlo en una lista tipo String. 
  3. Crear un objeto tipo StringBuilder a partir de la lista tipo String. 
  4. Enviar el contenido del objeto StringBuilder a un nuevo archivo ("tkdata.csv") que se guardará en el mismo bucket del archivo original. 
  5. Cargar el contenido del nuevo archivo a la tabla ``tkdata``.
  6. Verificar que los datos hayan sido cargados a la tabla.

BigQueryCsvUploader.java

import com.google.cloud.bigquery.*;
import com.google.cloud.storage.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class BigQueryCsvUploader {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryCsvUploader.class);
    private static final String NAME_FILE_ORIGINAL = "DATOS.csv";
    private static final String TABLE_NAME = "tkdata";
    private static final String DATASET = "mydataset";
    private static final String PROJECT = "myproject";
    private static final String BUCKET = "mybucket";
    private static final String NAME_NEW_FILE = "tkdata.csv";
    private static final long MAX_SIZE_BYTES = 300 * 1024 * 1024; // 300 MB

   
    public static class Tkdata {
        private Date fchinf;
        private long numregs;
        private String valor;

        public Date getFchinf() { return fchinf; }
        public void setFchinf(String fchinf) throws ParseException {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            this.fchinf = sdf.parse(fchinf);
        }
        public long getNumregs() { return numregs; }
        public void setNumregs(String numregs) { this.numregs = Long.parseLong(numregs); }
        public String getValor() { return valor; }
        public void setValor(String valor) { this.valor = valor; }
    }

   
    public static boolean existFile(String bucketName, String fileName) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Blob blob = storage.get(BlobId.of(bucketName, fileName));
        return blob != null && blob.exists();
    }

   
    public static boolean maxSize(String bucketName, 
     String fileName, long maxSizeBytes) {
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Blob blob = storage.get(BlobId.of(bucketName, fileName));
        if (blob == null) {
            LOG.error("El archivo {} no existe en el bucket {}", fileName, bucketName);
            return false;
        }
        return blob.getSize() <= maxSizeBytes;
    }

    
    public static List<Tkdata> getListTkdata(String bucketName, String fileName) {
        List<Tkdata> listaTkdata = new ArrayList<>();
        Storage storage = StorageOptions.getDefaultInstance().getService();
        Blob blob = storage.get(BlobId.of(bucketName, fileName));
        if (blob == null) {
            LOG.error("Archivo{} no encontrado en el bucket {}", fileName, bucketName);
            return listaTkdata;
        }

        String content = new String(blob.getContent(), StandardCharsets.UTF_8);
        try (BufferedReader reader = new BufferedReader(new StringReader(content))) {
            String line;
            reader.readLine();
            while ((line = reader.readLine()) != null) {
                String[] parts = line.split(",", -1); 
                if (parts.length == 3) {
                    Tkdata obj = new Tkdata();
                    try {
                        obj.setFchinf(parts[0].trim());
                        obj.setNumregs(parts[1].trim());
                        obj.setValor(parts[2].trim());
                        listaTkdata.add(obj);
                    } catch (ParseException | NumberFormatException e) {
                        LOG.error("Error parseando linea: {}", line, e);
                    }
                } else {
                    LOG.warn("Linea malformada: {}", line);
                }
            }
        } catch (IOException e) {
            LOG.error("Error al leer el contenido", e);
        }
        return listaTkdata;
    }

    
    public static StringBuilder convertToStringBuilder(List<Tkdata> listaTkdata) {
        StringBuilder sb = new StringBuilder();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        for (Tkdata item : listaTkdata) {
            sb.append(sdf.format(item.getFchinf())).append(",");
            sb.append(item.getNumregs()).append(",");
            sb.append(item.getValor()).append("\n");
        }
        return sb;
    }

   
    public static boolean creaNuevoFileCSV(String bucketName, 
       String fileName, String content) {
        try {
            Storage storage = StorageOptions.getDefaultInstance().getService();
            BlobId blobId = BlobId.of(bucketName, fileName);
            BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/csv").build();
            storage.create(blobInfo, content.getBytes(StandardCharsets.UTF_8));
            return true;
        } catch (Exception e) {
            LOG.error("Error al subir archivo {} al bucket {}", fileName, bucketName, e);
            return false;
        }
    }

    
    public static boolean loadCSVToTableBigQuery(String projectId, 
     String datasetId, String bucketName, String sourceFileName, String tableName) {
        try {
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            TableId tableId = TableId.of(projectId, datasetId, tableName);

            
            Schema schema = Schema.of(
                Field.of("fchinf", StandardSQLTypeName.DATE),
                Field.of("numregs", StandardSQLTypeName.INT64),
                Field.of("valor", StandardSQLTypeName.STRING)
            );

           
            JobConfiguration jobConfig = LoadJobConfiguration.newBuilder(
                tableId,
                String.format("gs://%s/%s", bucketName, sourceFileName),
                FormatOptions.csv()
            )
                .setSchema(schema)
                .setSkipLeadingRows(0) 
                .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) 
                .build();

            
            Job job = bigquery.create(JobInfo.of(jobConfig));
            job = job.waitFor();
            if (job.isDone() && job.getStatus().getError() == null) {
                LOG.info("CSV {} successfully loaded into BigQuery table {}.{}", 
                 sourceFileName, datasetId, tableName);
                return true;
            } else {
                LOG.error("Error cargando CSV a BigQuery: {}", job.getStatus().getError());
                return false;
            }
        } catch (Exception e) {
            LOG.error("Error en el proceso de carga", e);
            return false;
        }
    }

  
    public static boolean validateTableData(String projectId, 
      String datasetId, String tableName, List<Tkdata> expectedData) {
        try {
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            String query = String.format("SELECT fchinf, numregs, valor FROM %s.%s.%s", 
                        projectId, datasetId, tableName);
            QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

           
            TableResult result = bigquery.query(queryConfig);
            long rowCount = result.getTotalRows();

            if (rowCount == 0) {
                LOG.error("No hay datos en la tabla {}.{}", datasetId, tableName);
                return false;
            }

           
            if (expectedData != null && rowCount != expectedData.size()) {
                LOG.warn("Error en conteo de datos {}, found {}", expectedData.size(), rowCount);
                return false;
            }

            
            LOG.info("Sample data from {}.{} (first 5 rows):", datasetId, tableName);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            int maxRowsToLog = 5;
            int rowIndex = 0;
            for (FieldValueList row : result.iterateAll()) {
                if (rowIndex >= maxRowsToLog) break;
                String fchinf = row.get("fchinf").getStringValue(); 
                long numregs = row.get("numregs").getLongValue();
                String valor = row.get("valor").getStringValue();
                LOG.info("Row {}: fchinf={}, numregs={}, valor={}", rowIndex + 1, fchinf, numregs, valor);
                rowIndex++;
            }

            LOG.info("Validacion exitosa: {} rows found in table {}.{}", 
          rowCount, datasetId, tableName);
            return true;
        } catch (Exception e) {
            LOG.error("Error validando datos en la tabla {}.{}", datasetId, tableName, e);
            return false;
        }
    }

    public static void main(String[] args) {
     
        if (!existFile(BUCKET, NAME_FILE_ORIGINAL)) {
            LOG.error("Archivo{} no existe en el buckett {}.", NAME_FILE_ORIGINAL, BUCKET);
            return;
        }

        
        if (!maxSize(BUCKET, NAME_FILE_ORIGINAL, MAX_SIZE_BYTES)) {
            LOG.error("El archivo {} excede el tamaño: 300MB.", NAME_FILE_ORIGINAL);
            return;
        }

        
        List<Tkdata> listaTkdata = getListTkdata(BUCKET, NAME_FILE_ORIGINAL);
        if (listaTkdata.isEmpty()) {
            LOG.error("No hay datos para leer {}. ", NAME_FILE_ORIGINAL);
            return;
        }

        StringBuilder sb = convertToStringBuilder(listaTkdata);

        
        if (creaNuevoFileCSV(BUCKET, NAME_NEW_FILE, sb.toString())) {
            LOG.info("Archivo {} cargado al bucket {}", NAME_NEW_FILE, BUCKET);
            LOG.info("Carga de datos {} a la tabla: {}.{}", NAME_NEW_FILE, DATASET, TABLE_NAME);
            if (loadCSVToTableBigQuery(PROJECT, DATASET, BUCKET, NAME_NEW_FILE, TABLE_NAME)) {
                if (validateTableData(PROJECT, DATASET, TABLE_NAME, listaTkdata)) {
                    LOG.info("Validacion correcta {}.{}", DATASET, TABLE_NAME);
                } else {
                    LOG.error("Validacion fallida {}.{}", DATASET, TABLE_NAME);
                }
            } else {
                LOG.error("Fallo al cargar los datos a la tabla de BigQuery  {}.{}", DATASET, TABLE_NAME);
            }
        } else {
            LOG.error("Fallo al cargar el archivo {} al bucket {}", NAME_NEW_FILE, BUCKET);
        }
    }
}

Es importante hacer notar que los tipos de datos en el archivo deben concordar a los datos de la tabla. En caso contrario, no cargarán.

También es importante tener estas dependencias en el pom.xml

<dependencies>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-storage</artifactId>
        <version>2.44.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-bigquery</artifactId>
        <version>2.44.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.13</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.13</version>
    </dependency>
</dependencies>

De preferncia usar Eclipse IDE para generar un .jar o ejecutarlo directamente.

Si la ejecución es correcta podremos ver los datos cargados en la tabla. Continuaremos más sobre BigQuery en próximas entregas.

Enlaces:

https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv

No hay comentarios:

Publicar un comentario

Vibe Coding (la programación vía IA): ¿el futuro de la programación?

Vibe Coding es un nuevo paradigma de programación, una nueva forma de crear código. Es un enfoque emergente en el desarrollo de sof...

Etiquetas

Archivo del blog