Unstructured API services
Getting started with API services
Process individual files
Batch processing and ingestion
- Overview
- Ingest CLI
- Ingest Python library
- Ingest dependencies
- Ingest configuration
- Source connectors
- Destination connectors
How to
- Choose a partitioning strategy
- Choose a hi-res model
- Get element contents
- Process a subset of files
- Set embedding behavior
- Parse simple PDFs and HTML
- Set partitioning behavior
- Set chunking behavior
- Output unique element IDs
- Output bounding box coordinates
- Set document language for better OCR
- Extract tables as HTML
- Extract images and tables from documents
- Get chunked elements
- Change element coordinate systems
- Work with PowerPoint files
- Use LangChain and Ollama
- Use LangChain and Llama 3
- Transform a JSON file into a different schema
- Generate a JSON schema for a file
Troubleshooting
Endpoints
Examples
This page provides some examples of accessing Unstructured API via different methods.
For each of these examples, you’ll need:
These environment variables:
UNSTRUCTURED_API_KEY
- Your Unstructured API key value.UNSTRUCTURED_API_URL
- Your Unstructured API URL.
If you do not specify the API URL, your Unstructured Serverless API pay-as-you-go account will be used by default. You must always specify your Serverless API key.
To use the Free Unstructured API, you must always specify your Free API key, and the Free API URL which is https://api.unstructured.io/general/v0/general
To use the pay-as-you-go Unstructured API on Azure or AWS with the SDKs, you must always specify the corresponding API URL. See the Azure or AWS instructions.
Changing partition strategy for a PDF
Here’s how you can modify partition strategy for a PDF file, and select an alternative model to use with Unstructured API.
The hi_res
strategy supports different models, and the default is layout_v1.1.0
.
unstructured-ingest \
local \
--input-path $LOCAL_FILE_INPUT_DIR \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
--strategy hi_res \
--hi-res-model-name layout_v1.1.0 \
--partition-by-api \
--api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
import os
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
strategy="hi_res",
hi_res_model_name="layout_v1.0.0",
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
additional_partition_args={
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
).run()
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
curl -X 'POST' $UNSTRUCTURED_API_URL \
-H 'accept: application/json' \
-H 'Content-Type: multipart/form-data' \
-H 'unstructured-api-key: $UNSTRUCTURED_API_KEY' \
-F 'files=@sample-docs/layout-parser-paper.pdf' \
-F 'strategy=hi_res' \
-F 'hi_res_model_name=layout_v1.1.0'
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import asyncio
import os
import json
import unstructured_client
from unstructured_client.models import shared
client = unstructured_client.UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL"),
)
async def call_api(filename, input_dir, output_dir):
req = {
"partition_parameters": {
"files": {
"content": open(filename, "rb"),
"file_name": os.path.basename(filename),
},
"strategy": shared.Strategy.HI_RES,
"hi_res_model_name": "layout_v1.1.0",
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
}
try:
res = await client.general.partition_async(request=req)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
# Create the output directory structure.
relative_path = os.path.relpath(os.path.dirname(filename), input_dir)
output_subdir = os.path.join(output_dir, relative_path)
os.makedirs(output_subdir, exist_ok=True)
# Write the output file.
output_filename = os.path.join(output_subdir, os.path.basename(filename) + ".json")
with open(output_filename, "w") as file:
file.write(json_elements)
except Exception as e:
print(f"Error processing {filename}: {e}")
async def process_files(input_directory, output_directory):
tasks = []
for root, _, files in os.walk(input_directory):
for file in files:
if not file.endswith('.json'):
full_path = os.path.join(root, file)
tasks.append(call_api(full_path, input_directory, output_directory))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(process_files(
input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"),
output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR")
))
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import { UnstructuredClient } from "unstructured-client";
import * as fs from "fs";
import * as path from "path";
import { Strategy } from "unstructured-client/sdk/models/shared/index.js";
import { PartitionResponse } from "unstructured-client/sdk/models/operations";
// Send all files in the source path to Unstructured for processing.
// Send the processed data to the destination path.
function processFiles(
client: UnstructuredClient,
sourcePath: string,
destinationPath: string
): void {
// If an output directory does not exist for the corresponding input
// directory, then create it.
if (!fs.existsSync(destinationPath)) {
fs.mkdirSync(destinationPath, { recursive: true });
}
// Get all folders and files at the current level of the input directory.
const items = fs.readdirSync(sourcePath);
// For each folder and file in the input directory...
for (const item of items) {
const inputPath = path.join(sourcePath, item);
const outputPath = path.join(destinationPath, item)
// If it's a folder, call this function recursively.
if (fs.statSync(inputPath).isDirectory()) {
processFiles(client, inputPath, outputPath);
} else {
// If it's a file, send it to Unstructured for processing.
const data = fs.readFileSync(inputPath);
client.general.partition({
partitionParameters: {
files: {
content: data,
fileName: inputPath
},
strategy: Strategy.HiRes,
hiResModelName: "layout_v1.1.0",
splitPdfPage: true,
splitPdfConcurrencyLevel: 15,
splitPdfAllowFailed: true
}
}).then((res: PartitionResponse) => {
// If successfully processed, write the processed data to
// the destination directory.
if (res.statusCode == 200) {
const jsonElements = JSON.stringify(res.elements, null, 2)
fs.writeFileSync(outputPath + ".json", jsonElements)
}
}).catch((e) => {
if (e.statusCode) {
console.log(e.statusCode);
console.log(e.body);
} else {
console.log(e);
}
});
}
}
}
const client = new UnstructuredClient({
security: { apiKeyAuth: process.env.UNSTRUCTURED_API_KEY },
serverURL: process.env.UNSTRUCTURED_API_URL
});
processFiles(
client,
process.env.LOCAL_FILE_INPUT_DIR,
process.env.LOCAL_FILE_OUTPUT_DIR
);
If you have a local deployment of the Unstructured API, you can use other supported models, such as yolox
.
Specifying the language of a document for better OCR results
For better OCR results, you can specify what languages your document is in using the languages
parameter.
View the list of available languages.
unstructured-ingest \
local \
--input-path $LOCAL_FILE_INPUT_DIR \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
--strategy ocr_only \
--ocr-languages kor \
--partition-by-api \
--api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
import os
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
strategy="ocr_only",
ocr_languages=["kor"],
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
additional_partition_args={
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
).run()
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
curl -X 'POST' $UNSTRUCTURED_API_URL \
-H 'accept: application/json' \
-H 'Content-Type: multipart/form-data' \
-H 'unstructured-api-key: $UNSTRUCTURED_API_KEY' \
-F 'files=@sample-docs/korean.png' \
-F 'strategy=ocr_only' \
-F 'languages=kor'
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import asyncio
import os
import json
import unstructured_client
from unstructured_client.models import shared
client = unstructured_client.UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL"),
)
async def call_api(filename, input_dir, output_dir):
req = {
"partition_parameters": {
"files": {
"content": open(filename, "rb"),
"file_name": os.path.basename(filename),
},
"strategy": shared.Strategy.OCR_ONLY,
"languages": ["kor"],
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
}
try:
res = await client.general.partition_async(request=req)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
# Create the output directory structure.
relative_path = os.path.relpath(os.path.dirname(filename), input_dir)
output_subdir = os.path.join(output_dir, relative_path)
os.makedirs(output_subdir, exist_ok=True)
# Write the output file.
output_filename = os.path.join(output_subdir, os.path.basename(filename) + ".json")
with open(output_filename, "w") as file:
file.write(json_elements)
except Exception as e:
print(f"Error processing {filename}: {e}")
async def process_files(input_directory, output_directory):
tasks = []
for root, _, files in os.walk(input_directory):
for file in files:
if not file.endswith('.json'):
full_path = os.path.join(root, file)
tasks.append(call_api(full_path, input_directory, output_directory))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(process_files(
input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"),
output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR")
))
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import { UnstructuredClient } from "unstructured-client";
import * as fs from "fs";
import * as path from "path";
import { Strategy } from "unstructured-client/sdk/models/shared/index.js";
import { PartitionResponse } from "unstructured-client/sdk/models/operations";
// Send all files in the source path to Unstructured for processing.
// Send the processed data to the destination path.
function processFiles(
client: UnstructuredClient,
sourcePath: string,
destinationPath: string
): void {
// If an output directory does not exist for the corresponding input
// directory, then create it.
if (!fs.existsSync(destinationPath)) {
fs.mkdirSync(destinationPath, { recursive: true });
}
// Get all folders and files at the current level of the input directory.
const items = fs.readdirSync(sourcePath);
// For each folder and file in the input directory...
for (const item of items) {
const inputPath = path.join(sourcePath, item);
const outputPath = path.join(destinationPath, item)
// If it's a folder, call this function recursively.
if (fs.statSync(inputPath).isDirectory()) {
processFiles(client, inputPath, outputPath);
} else {
// If it's a file, send it to Unstructured for processing.
const data = fs.readFileSync(inputPath);
client.general.partition({
partitionParameters: {
files: {
content: data,
fileName: inputPath
},
strategy: Strategy.OcrOnly,
languages: ["kor"],
splitPdfPage: true,
splitPdfConcurrencyLevel: 15,
splitPdfAllowFailed: true
}
}).then((res: PartitionResponse) => {
// If successfully processed, write the processed data to
// the destination directory.
if (res.statusCode == 200) {
const jsonElements = JSON.stringify(res.elements, null, 2)
fs.writeFileSync(outputPath + ".json", jsonElements)
}
}).catch((e) => {
if (e.statusCode) {
console.log(e.statusCode);
console.log(e.body);
} else {
console.log(e);
}
});
}
}
}
const client = new UnstructuredClient({
security: { apiKeyAuth: process.env.UNSTRUCTURED_API_KEY },
serverURL: process.env.UNSTRUCTURED_API_URL
});
processFiles(
client,
process.env.LOCAL_FILE_INPUT_DIR,
process.env.LOCAL_FILE_OUTPUT_DIR
);
Saving bounding box coordinates
When elements are extracted from PDFs or images, it may be useful to get their bounding boxes as well.
Set the coordinates
parameter to true
to add this field to the elements in the response.
unstructured-ingest \
local \
--input-path $LOCAL_FILE_INPUT_DIR \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
--partition-by-api \
--api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--strategy hi_res \
--additional-partition-args="{\"coordinates\":\"true\", \"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
import os
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"coordinates": True,
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
).run()
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
curl -X 'POST' $UNSTRUCTURED_API_URL \
-H 'accept: application/json' \
-H 'Content-Type: multipart/form-data' \
-H 'unstructured-api-key: $UNSTRUCTURED_API_KEY' \
-F 'files=@sample-docs/layout-parser-paper.pdf' \
-F 'coordinates=true' \
-F 'strategy=hi_res'
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import asyncio
import os
import json
import unstructured_client
from unstructured_client.models import shared
client = unstructured_client.UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL"),
)
async def call_api(filename, input_dir, output_dir):
req = {
"partition_parameters": {
"files": {
"content": open(filename, "rb"),
"file_name": os.path.basename(filename),
},
"strategy": shared.Strategy.HI_RES,
"coordinates": True,
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
}
try:
res = await client.general.partition_async(request=req)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
# Create the output directory structure.
relative_path = os.path.relpath(os.path.dirname(filename), input_dir)
output_subdir = os.path.join(output_dir, relative_path)
os.makedirs(output_subdir, exist_ok=True)
# Write the output file.
output_filename = os.path.join(output_subdir, os.path.basename(filename) + ".json")
with open(output_filename, "w") as file:
file.write(json_elements)
except Exception as e:
print(f"Error processing {filename}: {e}")
async def process_files(input_directory, output_directory):
tasks = []
for root, _, files in os.walk(input_directory):
for file in files:
if not file.endswith('.json'):
full_path = os.path.join(root, file)
tasks.append(call_api(full_path, input_directory, output_directory))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(process_files(
input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"),
output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR")
))
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import { UnstructuredClient } from "unstructured-client";
import * as fs from "fs";
import * as path from "path";
import { Strategy } from "unstructured-client/sdk/models/shared/index.js";
import { PartitionResponse } from "unstructured-client/sdk/models/operations";
// Send all files in the source path to Unstructured for processing.
// Send the processed data to the destination path.
function processFiles(
client: UnstructuredClient,
sourcePath: string,
destinationPath: string
): void {
// If an output directory does not exist for the corresponding input
// directory, then create it.
if (!fs.existsSync(destinationPath)) {
fs.mkdirSync(destinationPath, { recursive: true });
}
// Get all folders and files at the current level of the input directory.
const items = fs.readdirSync(sourcePath);
// For each folder and file in the input directory...
for (const item of items) {
const inputPath = path.join(sourcePath, item);
const outputPath = path.join(destinationPath, item)
// If it's a folder, call this function recursively.
if (fs.statSync(inputPath).isDirectory()) {
processFiles(client, inputPath, outputPath);
} else {
// If it's a file, send it to Unstructured for processing.
const data = fs.readFileSync(inputPath);
client.general.partition({
partitionParameters: {
files: {
content: data,
fileName: inputPath
},
strategy: Strategy.HiRes,
coordinates: true,
splitPdfPage: true,
splitPdfConcurrencyLevel: 15,
splitPdfAllowFailed: true
}
}).then((res: PartitionResponse) => {
// If successfully processed, write the processed data to
// the destination directory.
if (res.statusCode == 200) {
const jsonElements = JSON.stringify(res.elements, null, 2)
fs.writeFileSync(outputPath + ".json", jsonElements)
}
}).catch((e) => {
if (e.statusCode) {
console.log(e.statusCode);
console.log(e.body);
} else {
console.log(e);
}
});
}
}
}
const client = new UnstructuredClient({
security: { apiKeyAuth: process.env.UNSTRUCTURED_API_KEY },
serverURL: process.env.UNSTRUCTURED_API_URL
});
processFiles(
client,
process.env.LOCAL_FILE_INPUT_DIR,
process.env.LOCAL_FILE_OUTPUT_DIR
);
Returning unique element IDs
By default, the element ID is a SHA-256 hash of the element text. This is to ensure that
the ID is deterministic. One downside is that the ID is not guaranteed to be unique.
Different elements with the same text will have the same ID, and there could also be hash collisions.
To use UUIDs in the output instead, set unique_element_ids=true
. Note: this means that the element IDs
will be random, so with every partition of the same file, you will get different IDs.
This can be helpful if you’d like to use the IDs as a primary key in a database, for example.
unstructured-ingest \
local \
--input-path $LOCAL_FILE_INPUT_DIR \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
--partition-by-api \
--api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--strategy hi_res \
--additional-partition-args="{\"unique_element_ids\":\"true\", \"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
import os
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"unique_element_ids": True,
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
).run()
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
curl -X 'POST' $UNSTRUCTURED_API_URL \
-H 'accept: application/json' \
-H 'Content-Type: multipart/form-data' \
-H 'unstructured-api-key: $UNSTRUCTURED_API_KEY' \
-F 'files=@sample-docs/layout-parser-paper-fast.pdf' \
-F 'unique_element_ids=true'
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import asyncio
import os
import json
import unstructured_client
from unstructured_client.models import shared
client = unstructured_client.UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL"),
)
async def call_api(filename, input_dir, output_dir):
req = {
"partition_parameters": {
"files": {
"content": open(filename, "rb"),
"file_name": os.path.basename(filename),
},
"strategy": shared.Strategy.HI_RES,
"unique_element_ids": True,
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
}
try:
res = await client.general.partition_async(request=req)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
# Create the output directory structure.
relative_path = os.path.relpath(os.path.dirname(filename), input_dir)
output_subdir = os.path.join(output_dir, relative_path)
os.makedirs(output_subdir, exist_ok=True)
# Write the output file.
output_filename = os.path.join(output_subdir, os.path.basename(filename) + ".json")
with open(output_filename, "w") as file:
file.write(json_elements)
except Exception as e:
print(f"Error processing {filename}: {e}")
async def process_files(input_directory, output_directory):
tasks = []
for root, _, files in os.walk(input_directory):
for file in files:
if not file.endswith('.json'):
full_path = os.path.join(root, file)
tasks.append(call_api(full_path, input_directory, output_directory))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(process_files(
input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"),
output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR")
))
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import { UnstructuredClient } from "unstructured-client";
import * as fs from "fs";
import * as path from "path";
import { Strategy } from "unstructured-client/sdk/models/shared/index.js";
import { PartitionResponse } from "unstructured-client/sdk/models/operations";
// Send all files in the source path to Unstructured for processing.
// Send the processed data to the destination path.
function processFiles(
client: UnstructuredClient,
sourcePath: string,
destinationPath: string
): void {
// If an output directory does not exist for the corresponding input
// directory, then create it.
if (!fs.existsSync(destinationPath)) {
fs.mkdirSync(destinationPath, { recursive: true });
}
// Get all folders and files at the current level of the input directory.
const items = fs.readdirSync(sourcePath);
// For each folder and file in the input directory...
for (const item of items) {
const inputPath = path.join(sourcePath, item);
const outputPath = path.join(destinationPath, item)
// If it's a folder, call this function recursively.
if (fs.statSync(inputPath).isDirectory()) {
processFiles(client, inputPath, outputPath);
} else {
// If it's a file, send it to Unstructured for processing.
const data = fs.readFileSync(inputPath);
client.general.partition({
partitionParameters: {
files: {
content: data,
fileName: inputPath
},
uniqueElementIds: true,
strategy: Strategy.HiRes,
splitPdfPage: true,
splitPdfConcurrencyLevel: 15,
splitPdfAllowFailed: true
}
}).then((res: PartitionResponse) => {
// If successfully processed, write the processed data to
// the destination directory.
if (res.statusCode == 200) {
const jsonElements = JSON.stringify(res.elements, null, 2)
fs.writeFileSync(outputPath + ".json", jsonElements)
}
}).catch((e) => {
if (e.statusCode) {
console.log(e.statusCode);
console.log(e.body);
} else {
console.log(e);
}
});
}
}
}
const client = new UnstructuredClient({
security: { apiKeyAuth: process.env.UNSTRUCTURED_API_KEY },
serverURL: process.env.UNSTRUCTURED_API_URL
});
processFiles(
client,
process.env.LOCAL_FILE_INPUT_DIR,
process.env.LOCAL_FILE_OUTPUT_DIR
);
Adding the chunking step after partitioning
You can combine partitioning and subsequent chunking in a single request by setting the chunking_strategy
parameter.
By default, the chunking_strategy
is set to None
, and no chunking is performed.
unstructured-ingest \
local \
--input-path $LOCAL_FILE_INPUT_DIR \
--output-dir $LOCAL_FILE_OUTPUT_DIR \
--chunking-strategy by_title \
--chunk-max-characters 1024 \
--partition-by-api \
--api-key $UNSTRUCTURED_API_KEY \
--partition-endpoint $UNSTRUCTURED_API_URL \
--strategy hi_res \
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}"
import os
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalIndexerConfig,
LocalDownloaderConfig,
LocalConnectionConfig,
LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
if __name__ == "__main__":
Pipeline.from_configs(
context=ProcessorConfig(),
indexer_config=LocalIndexerConfig(input_path=os.getenv("LOCAL_FILE_INPUT_DIR")),
downloader_config=LocalDownloaderConfig(),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_max_characters=1024
),
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
).run()
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
curl -X 'POST' $UNSTRUCTURED_API_URL \
-H 'accept: application/json' \
-H 'Content-Type: multipart/form-data' \
-H 'unstructured-api-key: $UNSTRUCTURED_API_KEY' \
-F 'files=@sample-docs/layout-parser-paper-fast.pdf' \
-F 'chunking_strategy=by_title' \
-F 'max_characters=1024' \
-F 'strategy=hi_res'
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import asyncio
import os
import json
import unstructured_client
from unstructured_client.models import shared
client = unstructured_client.UnstructuredClient(
api_key_auth=os.getenv("UNSTRUCTURED_API_KEY"),
server_url=os.getenv("UNSTRUCTURED_API_URL"),
)
async def call_api(filename, input_dir, output_dir):
req = {
"partition_parameters": {
"files": {
"content": open(filename, "rb"),
"file_name": os.path.basename(filename),
},
"chunking_strategy": "by_title",
"max_characters": 1024,
"strategy": shared.Strategy.HI_RES,
"split_pdf_page": True,
"split_pdf_allow_failed": True,
"split_pdf_concurrency_level": 15
}
}
try:
res = await client.general.partition_async(request=req)
element_dicts = [element for element in res.elements]
json_elements = json.dumps(element_dicts, indent=2)
# Create the output directory structure.
relative_path = os.path.relpath(os.path.dirname(filename), input_dir)
output_subdir = os.path.join(output_dir, relative_path)
os.makedirs(output_subdir, exist_ok=True)
# Write the output file.
output_filename = os.path.join(output_subdir, os.path.basename(filename) + ".json")
with open(output_filename, "w") as file:
file.write(json_elements)
except Exception as e:
print(f"Error processing {filename}: {e}")
async def process_files(input_directory, output_directory):
tasks = []
for root, _, files in os.walk(input_directory):
for file in files:
if not file.endswith('.json'):
full_path = os.path.join(root, file)
tasks.append(call_api(full_path, input_directory, output_directory))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(process_files(
input_directory=os.getenv("LOCAL_FILE_INPUT_DIR"),
output_directory=os.getenv("LOCAL_FILE_OUTPUT_DIR")
))
Unstructured recommends that you use the Unstructured Ingest CLI or the Unstructured Ingest Python library if any of the following apply to you:
- You need to work with documents in cloud storage.
- You want to cache the results of processing multiple files in batches.
- You want more precise control over document-processing pipeline stages such as partitioning, chunking, filtering, staging, and embedding.
import { UnstructuredClient } from "unstructured-client";
import * as fs from "fs";
import * as path from "path";
import { ChunkingStrategy, Strategy } from "unstructured-client/sdk/models/shared/index.js";
import { PartitionResponse } from "unstructured-client/sdk/models/operations";
// Send all files in the source path to Unstructured for processing.
// Send the processed data to the destination path.
function processFiles(
client: UnstructuredClient,
sourcePath: string,
destinationPath: string
): void {
// If an output directory does not exist for the corresponding input
// directory, then create it.
if (!fs.existsSync(destinationPath)) {
fs.mkdirSync(destinationPath, { recursive: true });
}
// Get all folders and files at the current level of the input directory.
const items = fs.readdirSync(sourcePath);
// For each folder and file in the input directory...
for (const item of items) {
const inputPath = path.join(sourcePath, item);
const outputPath = path.join(destinationPath, item)
// If it's a folder, call this function recursively.
if (fs.statSync(inputPath).isDirectory()) {
processFiles(client, inputPath, outputPath);
} else {
// If it's a file, send it to Unstructured for processing.
const data = fs.readFileSync(inputPath);
client.general.partition({
partitionParameters: {
files: {
content: data,
fileName: inputPath
},
strategy: Strategy.HiRes,
chunkingStrategy: ChunkingStrategy.ByTitle,
maxCharacters: 1024,
splitPdfPage: true,
splitPdfConcurrencyLevel: 15,
splitPdfAllowFailed: true
}
}).then((res: PartitionResponse) => {
// If successfully processed, write the processed data to
// the destination directory.
if (res.statusCode == 200) {
const jsonElements = JSON.stringify(res.elements, null, 2)
fs.writeFileSync(outputPath + ".json", jsonElements)
}
}).catch((e) => {
if (e.statusCode) {
console.log(e.statusCode);
console.log(e.body);
} else {
console.log(e);
}
});
}
}
}
const client = new UnstructuredClient({
security: { apiKeyAuth: process.env.UNSTRUCTURED_API_KEY },
serverURL: process.env.UNSTRUCTURED_API_URL
});
processFiles(
client,
process.env.LOCAL_FILE_INPUT_DIR,
process.env.LOCAL_FILE_OUTPUT_DIR
);
Was this page helpful?