En una entrega pasada vimos como cargar datos desde un CSV a una tabla en BigQuery.
Esta vez crearemos un programa con Java y BigQuery. Para ello necesitamos tener un archivo con datos el cual llamaremos "DATOS.txt". El contenido del archivo será similar a esto:
20250412,34,450.0
20250412,34,432.0
20250412,122,500.0
Tendremos una tabla llamada ``tkdata`` con los siguientes campos:
fchinf DATE
numregs INT64
valor STRING
Requisitos:
- Tener acceso a GCP BigQuery.
- Tener JDK 11 o más actual.
- Tener Maven (el más actual).
¿Qué hará el programa?
- Verificar si existe el archivo a subir ("DATOS.txt").
- Obtener su contenido y guardarlo en una lista tipo String.
- Crear un objeto tipo StringBuilder a partir de la lista tipo String.
- Enviar el contenido del objeto StringBuilder a un nuevo archivo ("tkdata.csv") que se guardará en el mismo bucket del archivo original.
- Cargar el contenido del nuevo archivo a la tabla ``tkdata``.
- Verificar que los datos hayan sido cargados a la tabla.
BigQueryCsvUploader.java
import com.google.cloud.bigquery.*;
import com.google.cloud.storage.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class BigQueryCsvUploader {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryCsvUploader.class);
private static final String NAME_FILE_ORIGINAL = "DATOS.csv";
private static final String TABLE_NAME = "tkdata";
private static final String DATASET = "mydataset";
private static final String PROJECT = "myproject";
private static final String BUCKET = "mybucket";
private static final String NAME_NEW_FILE = "tkdata.csv";
private static final long MAX_SIZE_BYTES = 300 * 1024 * 1024; // 300 MB
public static class Tkdata {
private Date fchinf;
private long numregs;
private String valor;
public Date getFchinf() { return fchinf; }
public void setFchinf(String fchinf) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
this.fchinf = sdf.parse(fchinf);
}
public long getNumregs() { return numregs; }
public void setNumregs(String numregs) { this.numregs = Long.parseLong(numregs); }
public String getValor() { return valor; }
public void setValor(String valor) { this.valor = valor; }
}
public static boolean existFile(String bucketName, String fileName) {
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(BlobId.of(bucketName, fileName));
return blob != null && blob.exists();
}
public static boolean maxSize(String bucketName,
String fileName, long maxSizeBytes) {
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(BlobId.of(bucketName, fileName));
if (blob == null) {
LOG.error("El archivo {} no existe en el bucket {}", fileName, bucketName);
return false;
}
return blob.getSize() <= maxSizeBytes;
}
public static List<Tkdata> getListTkdata(String bucketName, String fileName) {
List<Tkdata> listaTkdata = new ArrayList<>();
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(BlobId.of(bucketName, fileName));
if (blob == null) {
LOG.error("Archivo{} no encontrado en el bucket {}", fileName, bucketName);
return listaTkdata;
}
String content = new String(blob.getContent(), StandardCharsets.UTF_8);
try (BufferedReader reader = new BufferedReader(new StringReader(content))) {
String line;
reader.readLine();
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",", -1);
if (parts.length == 3) {
Tkdata obj = new Tkdata();
try {
obj.setFchinf(parts[0].trim());
obj.setNumregs(parts[1].trim());
obj.setValor(parts[2].trim());
listaTkdata.add(obj);
} catch (ParseException | NumberFormatException e) {
LOG.error("Error parseando linea: {}", line, e);
}
} else {
LOG.warn("Linea malformada: {}", line);
}
}
} catch (IOException e) {
LOG.error("Error al leer el contenido", e);
}
return listaTkdata;
}
public static StringBuilder convertToStringBuilder(List<Tkdata> listaTkdata) {
StringBuilder sb = new StringBuilder();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (Tkdata item : listaTkdata) {
sb.append(sdf.format(item.getFchinf())).append(",");
sb.append(item.getNumregs()).append(",");
sb.append(item.getValor()).append("\n");
}
return sb;
}
public static boolean creaNuevoFileCSV(String bucketName,
String fileName, String content) {
try {
Storage storage = StorageOptions.getDefaultInstance().getService();
BlobId blobId = BlobId.of(bucketName, fileName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/csv").build();
storage.create(blobInfo, content.getBytes(StandardCharsets.UTF_8));
return true;
} catch (Exception e) {
LOG.error("Error al subir archivo {} al bucket {}", fileName, bucketName, e);
return false;
}
}
public static boolean loadCSVToTableBigQuery(String projectId,
String datasetId, String bucketName, String sourceFileName, String tableName) {
try {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(projectId, datasetId, tableName);
Schema schema = Schema.of(
Field.of("fchinf", StandardSQLTypeName.DATE),
Field.of("numregs", StandardSQLTypeName.INT64),
Field.of("valor", StandardSQLTypeName.STRING)
);
JobConfiguration jobConfig = LoadJobConfiguration.newBuilder(
tableId,
String.format("gs://%s/%s", bucketName, sourceFileName),
FormatOptions.csv()
)
.setSchema(schema)
.setSkipLeadingRows(0)
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.build();
Job job = bigquery.create(JobInfo.of(jobConfig));
job = job.waitFor();
if (job.isDone() && job.getStatus().getError() == null) {
LOG.info("CSV {} successfully loaded into BigQuery table {}.{}",
sourceFileName, datasetId, tableName);
return true;
} else {
LOG.error("Error cargando CSV a BigQuery: {}", job.getStatus().getError());
return false;
}
} catch (Exception e) {
LOG.error("Error en el proceso de carga", e);
return false;
}
}
public static boolean validateTableData(String projectId,
String datasetId, String tableName, List<Tkdata> expectedData) {
try {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
String query = String.format("SELECT fchinf, numregs, valor FROM %s.%s.%s",
projectId, datasetId, tableName);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
TableResult result = bigquery.query(queryConfig);
long rowCount = result.getTotalRows();
if (rowCount == 0) {
LOG.error("No hay datos en la tabla {}.{}", datasetId, tableName);
return false;
}
if (expectedData != null && rowCount != expectedData.size()) {
LOG.warn("Error en conteo de datos {}, found {}", expectedData.size(), rowCount);
return false;
}
LOG.info("Sample data from {}.{} (first 5 rows):", datasetId, tableName);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
int maxRowsToLog = 5;
int rowIndex = 0;
for (FieldValueList row : result.iterateAll()) {
if (rowIndex >= maxRowsToLog) break;
String fchinf = row.get("fchinf").getStringValue();
long numregs = row.get("numregs").getLongValue();
String valor = row.get("valor").getStringValue();
LOG.info("Row {}: fchinf={}, numregs={}, valor={}", rowIndex + 1, fchinf, numregs, valor);
rowIndex++;
}
LOG.info("Validacion exitosa: {} rows found in table {}.{}",
rowCount, datasetId, tableName);
return true;
} catch (Exception e) {
LOG.error("Error validando datos en la tabla {}.{}", datasetId, tableName, e);
return false;
}
}
public static void main(String[] args) {
if (!existFile(BUCKET, NAME_FILE_ORIGINAL)) {
LOG.error("Archivo{} no existe en el buckett {}.", NAME_FILE_ORIGINAL, BUCKET);
return;
}
if (!maxSize(BUCKET, NAME_FILE_ORIGINAL, MAX_SIZE_BYTES)) {
LOG.error("El archivo {} excede el tamaño: 300MB.", NAME_FILE_ORIGINAL);
return;
}
List<Tkdata> listaTkdata = getListTkdata(BUCKET, NAME_FILE_ORIGINAL);
if (listaTkdata.isEmpty()) {
LOG.error("No hay datos para leer {}. ", NAME_FILE_ORIGINAL);
return;
}
StringBuilder sb = convertToStringBuilder(listaTkdata);
if (creaNuevoFileCSV(BUCKET, NAME_NEW_FILE, sb.toString())) {
LOG.info("Archivo {} cargado al bucket {}", NAME_NEW_FILE, BUCKET);
LOG.info("Carga de datos {} a la tabla: {}.{}", NAME_NEW_FILE, DATASET, TABLE_NAME);
if (loadCSVToTableBigQuery(PROJECT, DATASET, BUCKET, NAME_NEW_FILE, TABLE_NAME)) {
if (validateTableData(PROJECT, DATASET, TABLE_NAME, listaTkdata)) {
LOG.info("Validacion correcta {}.{}", DATASET, TABLE_NAME);
} else {
LOG.error("Validacion fallida {}.{}", DATASET, TABLE_NAME);
}
} else {
LOG.error("Fallo al cargar los datos a la tabla de BigQuery {}.{}", DATASET, TABLE_NAME);
}
} else {
LOG.error("Fallo al cargar el archivo {} al bucket {}", NAME_NEW_FILE, BUCKET);
}
}
}
Es importante hacer notar que los tipos de datos en el archivo deben concordar a los datos de la tabla. En caso contrario, no cargarán.
También es importante tener estas dependencias en el pom.xml
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>2.44.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.44.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.13</version>
</dependency>
</dependencies>
De preferncia usar Eclipse IDE para generar un .jar o ejecutarlo directamente.
Si la ejecución es correcta podremos ver los datos cargados en la tabla. Continuaremos más sobre BigQuery en próximas entregas.
Enlaces:
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv