En una entrega pasada vimos como cargar datos desde un CSV a una tabla en BigQuery.
Esta vez crearemos un programa con Java y BigQuery. Para ello necesitamos tener un archivo con datos el cual llamaremos "DATOS.txt". El contenido del archivo será similar a esto:
20250412,34,450.0 20250412,34,432.0 20250412,122,500.0
Tendremos una tabla llamada ``tkdata`` con los siguientes campos:
fchinf DATE numregs INT64 valor STRING
Requisitos:
- Tener acceso a GCP BigQuery.
- Tener JDK 11 o más actual.
- Tener Maven (el más actual).
¿Qué hará el programa?
- Verificar si existe el archivo a subir ("DATOS.txt").
- Obtener su contenido y guardarlo en una lista tipo String.
- Crear un objeto tipo StringBuilder a partir de la lista tipo String.
- Enviar el contenido del objeto StringBuilder a un nuevo archivo ("tkdata.csv") que se guardará en el mismo bucket del archivo original.
- Cargar el contenido del nuevo archivo a la tabla ``tkdata``.
- Verificar que los datos hayan sido cargados a la tabla.
BigQueryCsvUploader.java
import com.google.cloud.bigquery.*; import com.google.cloud.storage.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; public class BigQueryCsvUploader { private static final Logger LOG = LoggerFactory.getLogger(BigQueryCsvUploader.class); private static final String NAME_FILE_ORIGINAL = "DATOS.csv"; private static final String TABLE_NAME = "tkdata"; private static final String DATASET = "mydataset"; private static final String PROJECT = "myproject"; private static final String BUCKET = "mybucket"; private static final String NAME_NEW_FILE = "tkdata.csv"; private static final long MAX_SIZE_BYTES = 300 * 1024 * 1024; // 300 MB public static class Tkdata { private Date fchinf; private long numregs; private String valor; public Date getFchinf() { return fchinf; } public void setFchinf(String fchinf) throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); this.fchinf = sdf.parse(fchinf); } public long getNumregs() { return numregs; } public void setNumregs(String numregs) { this.numregs = Long.parseLong(numregs); } public String getValor() { return valor; } public void setValor(String valor) { this.valor = valor; } } public static boolean existFile(String bucketName, String fileName) { Storage storage = StorageOptions.getDefaultInstance().getService(); Blob blob = storage.get(BlobId.of(bucketName, fileName)); return blob != null && blob.exists(); } public static boolean maxSize(String bucketName, String fileName, long maxSizeBytes) { Storage storage = StorageOptions.getDefaultInstance().getService(); Blob blob = storage.get(BlobId.of(bucketName, fileName)); if (blob == null) { LOG.error("El archivo {} no existe en el bucket {}", fileName, bucketName); return false; } return blob.getSize() <= maxSizeBytes; } public static List<Tkdata> getListTkdata(String bucketName, String fileName) { List<Tkdata> listaTkdata = new ArrayList<>(); Storage storage = StorageOptions.getDefaultInstance().getService(); Blob blob = storage.get(BlobId.of(bucketName, fileName)); if (blob == null) { LOG.error("Archivo{} no encontrado en el bucket {}", fileName, bucketName); return listaTkdata; } String content = new String(blob.getContent(), StandardCharsets.UTF_8); try (BufferedReader reader = new BufferedReader(new StringReader(content))) { String line; reader.readLine(); while ((line = reader.readLine()) != null) { String[] parts = line.split(",", -1); if (parts.length == 3) { Tkdata obj = new Tkdata(); try { obj.setFchinf(parts[0].trim()); obj.setNumregs(parts[1].trim()); obj.setValor(parts[2].trim()); listaTkdata.add(obj); } catch (ParseException | NumberFormatException e) { LOG.error("Error parseando linea: {}", line, e); } } else { LOG.warn("Linea malformada: {}", line); } } } catch (IOException e) { LOG.error("Error al leer el contenido", e); } return listaTkdata; } public static StringBuilder convertToStringBuilder(List<Tkdata> listaTkdata) { StringBuilder sb = new StringBuilder(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); for (Tkdata item : listaTkdata) { sb.append(sdf.format(item.getFchinf())).append(","); sb.append(item.getNumregs()).append(","); sb.append(item.getValor()).append("\n"); } return sb; } public static boolean creaNuevoFileCSV(String bucketName, String fileName, String content) { try { Storage storage = StorageOptions.getDefaultInstance().getService(); BlobId blobId = BlobId.of(bucketName, fileName); BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/csv").build(); storage.create(blobInfo, content.getBytes(StandardCharsets.UTF_8)); return true; } catch (Exception e) { LOG.error("Error al subir archivo {} al bucket {}", fileName, bucketName, e); return false; } } public static boolean loadCSVToTableBigQuery(String projectId, String datasetId, String bucketName, String sourceFileName, String tableName) { try { BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); TableId tableId = TableId.of(projectId, datasetId, tableName); Schema schema = Schema.of( Field.of("fchinf", StandardSQLTypeName.DATE), Field.of("numregs", StandardSQLTypeName.INT64), Field.of("valor", StandardSQLTypeName.STRING) ); JobConfiguration jobConfig = LoadJobConfiguration.newBuilder( tableId, String.format("gs://%s/%s", bucketName, sourceFileName), FormatOptions.csv() ) .setSchema(schema) .setSkipLeadingRows(0) .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) .build(); Job job = bigquery.create(JobInfo.of(jobConfig)); job = job.waitFor(); if (job.isDone() && job.getStatus().getError() == null) { LOG.info("CSV {} successfully loaded into BigQuery table {}.{}", sourceFileName, datasetId, tableName); return true; } else { LOG.error("Error cargando CSV a BigQuery: {}", job.getStatus().getError()); return false; } } catch (Exception e) { LOG.error("Error en el proceso de carga", e); return false; } } public static boolean validateTableData(String projectId, String datasetId, String tableName, List<Tkdata> expectedData) { try { BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); String query = String.format("SELECT fchinf, numregs, valor FROM %s.%s.%s", projectId, datasetId, tableName); QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build(); TableResult result = bigquery.query(queryConfig); long rowCount = result.getTotalRows(); if (rowCount == 0) { LOG.error("No hay datos en la tabla {}.{}", datasetId, tableName); return false; } if (expectedData != null && rowCount != expectedData.size()) { LOG.warn("Error en conteo de datos {}, found {}", expectedData.size(), rowCount); return false; } LOG.info("Sample data from {}.{} (first 5 rows):", datasetId, tableName); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); int maxRowsToLog = 5; int rowIndex = 0; for (FieldValueList row : result.iterateAll()) { if (rowIndex >= maxRowsToLog) break; String fchinf = row.get("fchinf").getStringValue(); long numregs = row.get("numregs").getLongValue(); String valor = row.get("valor").getStringValue(); LOG.info("Row {}: fchinf={}, numregs={}, valor={}", rowIndex + 1, fchinf, numregs, valor); rowIndex++; } LOG.info("Validacion exitosa: {} rows found in table {}.{}", rowCount, datasetId, tableName); return true; } catch (Exception e) { LOG.error("Error validando datos en la tabla {}.{}", datasetId, tableName, e); return false; } } public static void main(String[] args) { if (!existFile(BUCKET, NAME_FILE_ORIGINAL)) { LOG.error("Archivo{} no existe en el buckett {}.", NAME_FILE_ORIGINAL, BUCKET); return; } if (!maxSize(BUCKET, NAME_FILE_ORIGINAL, MAX_SIZE_BYTES)) { LOG.error("El archivo {} excede el tamaño: 300MB.", NAME_FILE_ORIGINAL); return; } List<Tkdata> listaTkdata = getListTkdata(BUCKET, NAME_FILE_ORIGINAL); if (listaTkdata.isEmpty()) { LOG.error("No hay datos para leer {}. ", NAME_FILE_ORIGINAL); return; } StringBuilder sb = convertToStringBuilder(listaTkdata); if (creaNuevoFileCSV(BUCKET, NAME_NEW_FILE, sb.toString())) { LOG.info("Archivo {} cargado al bucket {}", NAME_NEW_FILE, BUCKET); LOG.info("Carga de datos {} a la tabla: {}.{}", NAME_NEW_FILE, DATASET, TABLE_NAME); if (loadCSVToTableBigQuery(PROJECT, DATASET, BUCKET, NAME_NEW_FILE, TABLE_NAME)) { if (validateTableData(PROJECT, DATASET, TABLE_NAME, listaTkdata)) { LOG.info("Validacion correcta {}.{}", DATASET, TABLE_NAME); } else { LOG.error("Validacion fallida {}.{}", DATASET, TABLE_NAME); } } else { LOG.error("Fallo al cargar los datos a la tabla de BigQuery {}.{}", DATASET, TABLE_NAME); } } else { LOG.error("Fallo al cargar el archivo {} al bucket {}", NAME_NEW_FILE, BUCKET); } } }
Es importante hacer notar que los tipos de datos en el archivo deben concordar a los datos de la tabla. En caso contrario, no cargarán.
También es importante tener estas dependencias en el pom.xml
<dependencies> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-storage</artifactId> <version>2.44.0</version> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquery</artifactId> <version>2.44.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.13</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.13</version> </dependency> </dependencies>
De preferncia usar Eclipse IDE para generar un .jar o ejecutarlo directamente.
Si la ejecución es correcta podremos ver los datos cargados en la tabla. Continuaremos más sobre BigQuery en próximas entregas.
Enlaces:
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv
No hay comentarios:
Publicar un comentario