26
26
27
27
28
28
def _parse_doc_from_row (
29
- content_columns : Iterable [str ], metadata_columns : Iterable [str ], row : Dict
29
+ content_columns : Iterable [str ],
30
+ metadata_columns : Iterable [str ],
31
+ row : Dict ,
32
+ metadata_json_column : str = DEFAULT_METADATA_COL ,
30
33
) -> Document :
31
34
page_content = " " .join (
32
35
str (row [column ]) for column in content_columns if column in row
33
36
)
34
37
metadata : Dict [str , Any ] = {}
35
38
# unnest metadata from langchain_metadata column
36
- if DEFAULT_METADATA_COL in metadata_columns and row .get (DEFAULT_METADATA_COL ):
37
- for k , v in row [DEFAULT_METADATA_COL ].items ():
39
+ if row .get (metadata_json_column ):
40
+ for k , v in row [metadata_json_column ].items ():
38
41
metadata [k ] = v
39
42
# load metadata from other columns
40
43
for column in metadata_columns :
41
- if column in row and column != DEFAULT_METADATA_COL :
44
+ if column in row and column != metadata_json_column :
42
45
metadata [column ] = row [column ]
43
46
return Document (page_content = page_content , metadata = metadata )
44
47
45
48
46
- def _parse_row_from_doc (column_names : Iterable [str ], doc : Document ) -> Dict :
49
+ def _parse_row_from_doc (
50
+ column_names : Iterable [str ],
51
+ doc : Document ,
52
+ content_column : str = DEFAULT_CONTENT_COL ,
53
+ metadata_json_column : str = DEFAULT_METADATA_COL ,
54
+ ) -> Dict :
47
55
doc_metadata = doc .metadata .copy ()
48
- row : Dict [str , Any ] = {DEFAULT_CONTENT_COL : doc .page_content }
56
+ row : Dict [str , Any ] = {content_column : doc .page_content }
49
57
for entry in doc .metadata :
50
58
if entry in column_names :
51
59
row [entry ] = doc_metadata [entry ]
52
60
del doc_metadata [entry ]
53
61
# store extra metadata in langchain_metadata column in json format
54
- if DEFAULT_METADATA_COL in column_names and len (doc_metadata ) > 0 :
55
- row [DEFAULT_METADATA_COL ] = doc_metadata
62
+ if metadata_json_column in column_names and len (doc_metadata ) > 0 :
63
+ row [metadata_json_column ] = doc_metadata
56
64
return row
57
65
58
66
@@ -66,6 +74,7 @@ def __init__(
66
74
query : str = "" ,
67
75
content_columns : Optional [List [str ]] = None ,
68
76
metadata_columns : Optional [List [str ]] = None ,
77
+ metadata_json_column : Optional [str ] = None ,
69
78
):
70
79
"""
71
80
Document page content defaults to the first column present in the query or table and
@@ -77,19 +86,22 @@ def __init__(
77
86
space-separated string concatenation.
78
87
79
88
Args:
80
- engine (MSSQLEngine): MSSQLEngine object to connect to the MSSQL database.
81
- table_name (str): The MSSQL database table name. (OneOf: table_name, query).
82
- query (str): The query to execute in MSSQL format. (OneOf: table_name, query).
83
- content_columns (List[str]): The columns to write into the `page_content`
84
- of the document. Optional.
85
- metadata_columns (List[str]): The columns to write into the `metadata` of the document.
86
- Optional.
89
+ engine (MSSQLEngine): MSSQLEngine object to connect to the MSSQL database.
90
+ table_name (str): The MSSQL database table name. (OneOf: table_name, query).
91
+ query (str): The query to execute in MSSQL format. (OneOf: table_name, query).
92
+ content_columns (List[str]): The columns to write into the `page_content`
93
+ of the document. Optional.
94
+ metadata_columns (List[str]): The columns to write into the `metadata` of the document.
95
+ Optional.
96
+ metadata_json_column (str): The name of the JSON column to use as the metadata’s base
97
+ dictionary. Default: `langchain_metadata`. Optional.
87
98
"""
88
99
self .engine = engine
89
100
self .table_name = table_name
90
101
self .query = query
91
102
self .content_columns = content_columns
92
103
self .metadata_columns = metadata_columns
104
+ self .metadata_json_column = metadata_json_column
93
105
if not self .table_name and not self .query :
94
106
raise ValueError ("One of 'table_name' or 'query' must be specified." )
95
107
if self .table_name and self .query :
@@ -128,6 +140,25 @@ def lazy_load(self) -> Iterator[Document]:
128
140
metadata_columns = self .metadata_columns or [
129
141
col for col in column_names if col not in content_columns
130
142
]
143
+ # check validity of metadata json column
144
+ if (
145
+ self .metadata_json_column
146
+ and self .metadata_json_column not in column_names
147
+ ):
148
+ raise ValueError (
149
+ f"Column { self .metadata_json_column } not found in query result { column_names } ."
150
+ )
151
+ # check validity of other column
152
+ all_names = content_columns + metadata_columns
153
+ for name in all_names :
154
+ if name not in column_names :
155
+ raise ValueError (
156
+ f"Column { name } not found in query result { column_names } ."
157
+ )
158
+ # use default metadata json column if not specified
159
+ metadata_json_column = self .metadata_json_column or DEFAULT_METADATA_COL
160
+
161
+ # load document one by one
131
162
while True :
132
163
row = result_proxy .fetchone ()
133
164
if not row :
@@ -136,11 +167,13 @@ def lazy_load(self) -> Iterator[Document]:
136
167
row_data = {}
137
168
for column in column_names :
138
169
value = getattr (row , column )
139
- if column == DEFAULT_METADATA_COL :
170
+ if column == metadata_json_column :
140
171
row_data [column ] = json .loads (value )
141
172
else :
142
173
row_data [column ] = value
143
- yield _parse_doc_from_row (content_columns , metadata_columns , row_data )
174
+ yield _parse_doc_from_row (
175
+ content_columns , metadata_columns , row_data , metadata_json_column
176
+ )
144
177
145
178
146
179
class MSSQLDocumentSaver :
@@ -150,6 +183,8 @@ def __init__(
150
183
self ,
151
184
engine : MSSQLEngine ,
152
185
table_name : str ,
186
+ content_column : Optional [str ] = None ,
187
+ metadata_json_column : Optional [str ] = None ,
153
188
):
154
189
"""
155
190
MSSQLDocumentSaver allows for saving of langchain documents in a database. If the table
@@ -160,14 +195,28 @@ def __init__(
160
195
Args:
161
196
engine: MSSQLEngine object to connect to the MSSQL database.
162
197
table_name: The name of table for saving documents.
198
+ content_column (str): The column to store document content.
199
+ Deafult: `page_content`. Optional.
200
+ metadata_json_column (str): The name of the JSON column to use as the metadata’s base
201
+ dictionary. Default: `langchain_metadata`. Optional.
163
202
"""
164
203
self .engine = engine
165
204
self .table_name = table_name
166
205
self ._table = self .engine ._load_document_table (table_name )
167
- if DEFAULT_CONTENT_COL not in self ._table .columns .keys ():
206
+ self .content_column = content_column or DEFAULT_CONTENT_COL
207
+ if self .content_column not in self ._table .columns .keys ():
208
+ raise ValueError (
209
+ f"Missing '{ self .content_column } ' field in table { table_name } ."
210
+ )
211
+ # check metadata_json_column existence if it's provided.
212
+ if (
213
+ metadata_json_column
214
+ and metadata_json_column not in self ._table .columns .keys ()
215
+ ):
168
216
raise ValueError (
169
- f"Missing ' { DEFAULT_CONTENT_COL } ' field in table { table_name } ."
217
+ f"Cannot find ' { metadata_json_column } ' column in table { table_name } ."
170
218
)
219
+ self .metadata_json_column = metadata_json_column or DEFAULT_METADATA_COL
171
220
172
221
def add_documents (self , docs : List [Document ]) -> None :
173
222
"""
@@ -179,9 +228,16 @@ def add_documents(self, docs: List[Document]) -> None:
179
228
"""
180
229
with self .engine .connect () as conn :
181
230
for doc in docs :
182
- row = _parse_row_from_doc (self ._table .columns .keys (), doc )
183
- if DEFAULT_METADATA_COL in row :
184
- row [DEFAULT_METADATA_COL ] = json .dumps (row [DEFAULT_METADATA_COL ])
231
+ row = _parse_row_from_doc (
232
+ self ._table .columns .keys (),
233
+ doc ,
234
+ self .content_column ,
235
+ self .metadata_json_column ,
236
+ )
237
+ if self .metadata_json_column in row :
238
+ row [self .metadata_json_column ] = json .dumps (
239
+ row [self .metadata_json_column ]
240
+ )
185
241
conn .execute (sqlalchemy .insert (self ._table ).values (row ))
186
242
conn .commit ()
187
243
@@ -195,9 +251,16 @@ def delete(self, docs: List[Document]) -> None:
195
251
"""
196
252
with self .engine .connect () as conn :
197
253
for doc in docs :
198
- row = _parse_row_from_doc (self ._table .columns .keys (), doc )
199
- if DEFAULT_METADATA_COL in row :
200
- row [DEFAULT_METADATA_COL ] = json .dumps (row [DEFAULT_METADATA_COL ])
254
+ row = _parse_row_from_doc (
255
+ self ._table .columns .keys (),
256
+ doc ,
257
+ self .content_column ,
258
+ self .metadata_json_column ,
259
+ )
260
+ if self .metadata_json_column in row :
261
+ row [self .metadata_json_column ] = json .dumps (
262
+ row [self .metadata_json_column ]
263
+ )
201
264
# delete by matching all fields of document
202
265
where_conditions = []
203
266
for col in self ._table .columns :
0 commit comments