-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimport.py
More file actions
27 lines (20 loc) · 759 Bytes
/
import.py
File metadata and controls
27 lines (20 loc) · 759 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import aiofiles
from aiocsv import AsyncDictReader
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk
from src import config
es = AsyncElasticsearch(hosts=[f"{config.ES_URL}{config.ES_INDEX}/"])
async def gendata():
async with aiofiles.open(
"posts_data.csv", mode="r", encoding="utf-8", newline=""
) as afp:
async for row in AsyncDictReader(afp):
yield row # row is a dict
async def main():
async for ok, result in async_streaming_bulk(es, gendata()):
action, result = result.popitem()
if not ok:
print("failed to %s document %s" % (action, result))
loop = asyncio.new_event_loop()
loop.run_until_complete(main())