view blocks flow

This commit is contained in:
Chris Anderson 2008-10-14 01:07:48 -07:00
parent 54a0afdf8e
commit 254eb20161
11 changed files with 124 additions and 28 deletions

View file

@ -5,8 +5,9 @@ module CouchRest
class Database
attr_reader :server, :host, :name, :root
# Create a CouchRest::Database adapter for the supplied CouchRest::Server and database name.
#
# Create a CouchRest::Database adapter for the supplied CouchRest::Server
# and database name.
#
# ==== Parameters
# server<CouchRest::Server>:: database host
# name<String>:: database name
@ -40,7 +41,9 @@ module CouchRest
end
end
# POST a temporary view function to CouchDB for querying. This is not recommended, as you don't get any performance benefit from CouchDB's materialized views. Can be quite slow on large databases.
# POST a temporary view function to CouchDB for querying. This is not
# recommended, as you don't get any performance benefit from CouchDB's
# materialized views. Can be quite slow on large databases.
def temp_view funcs, params = {}
keys = params.delete(:keys)
funcs = funcs.merge({:keys => keys}) if keys
@ -48,7 +51,8 @@ module CouchRest
JSON.parse(RestClient.post(url, funcs.to_json, {"Content-Type" => 'application/json'}))
end
# Query a CouchDB view as defined by a <tt>_design</tt> document. Accepts paramaters as described in http://wiki.apache.org/couchdb/HttpViewApi
# Query a CouchDB view as defined by a <tt>_design</tt> document. Accepts
# paramaters as described in http://wiki.apache.org/couchdb/HttpViewApi
def view name, params = {}, &block
keys = params.delete(:keys)
url = CouchRest.paramify_url "#{@root}/_view/#{name}", params
@ -56,6 +60,7 @@ module CouchRest
CouchRest.post(url, {:keys => keys})
else
if block_given?
puts "streamer"
@streamer.view(name, params, &block)
else
CouchRest.get url
@ -89,7 +94,12 @@ module CouchRest
JSON.parse(RestClient.put(uri, file, options))
end
# Save a document to CouchDB. This will use the <tt>_id</tt> field from the document as the id for PUT, or request a new UUID from CouchDB, if no <tt>_id</tt> is present on the document. IDs are attached to documents on the client side because POST has the curious property of being automatically retried by proxies in the event of network segmentation and lost responses.
# Save a document to CouchDB. This will use the <tt>_id</tt> field from
# the document as the id for PUT, or request a new UUID from CouchDB, if
# no <tt>_id</tt> is present on the document. IDs are attached to
# documents on the client side because POST has the curious property of
# being automatically retried by proxies in the event of network
# segmentation and lost responses.
def save doc
if doc['_attachments']
doc['_attachments'] = encode_attachments(doc['_attachments'])
@ -107,7 +117,8 @@ module CouchRest
end
end
# POST an array of documents to CouchDB. If any of the documents are missing ids, supply one from the uuid cache.
# POST an array of documents to CouchDB. If any of the documents are
# missing ids, supply one from the uuid cache.
def bulk_save docs
ids, noids = docs.partition{|d|d['_id']}
uuid_count = [noids.length, @server.uuid_batch_count].max
@ -118,13 +129,15 @@ module CouchRest
CouchRest.post "#{@root}/_bulk_docs", {:docs => docs}
end
# DELETE the document from CouchDB that has the given <tt>_id</tt> and <tt>_rev</tt>.
# DELETE the document from CouchDB that has the given <tt>_id</tt> and
# <tt>_rev</tt>.
def delete doc
slug = CGI.escape(doc['_id'])
CouchRest.delete "#{@root}/#{slug}?rev=#{doc['_rev']}"
end
# DELETE the database itself. This is not undoable and could be rather catastrophic. Use with care!
# DELETE the database itself. This is not undoable and could be rather
# catastrophic. Use with care!
def delete!
CouchRest.delete @root
end

View file

@ -302,6 +302,9 @@ module CouchRest
self.meta_class.instance_eval do
define_method method_name do |*args|
# block = args.pop if args.last.is_a?(Proc)
block = nil
puts "block" if block_given?
query = opts.merge(args[0] || {})
query[:raw] = true if query[:reduce]
unless design_doc_fresh
@ -309,7 +312,7 @@ module CouchRest
end
raw = query.delete(:raw)
view_name = "#{design_doc_slug}/#{method_name}"
fetch_view_with_docs(view_name, query, raw)
fetch_view_with_docs(view_name, query, raw, &block)
end
end
end
@ -319,28 +322,38 @@ module CouchRest
database.get("_design/#{design_doc_slug}")
end
# Dispatches to any named view.
def view name, query={}, &block
name = name.to_s
view_name = "#{design_doc_slug}/#{name}"
puts view_name
fetch_view_with_docs(view_name, query, true, &block)
end
private
def fetch_view_with_docs name, opts, raw=false
def fetch_view_with_docs name, opts, raw=false, &block
if raw
fetch_view name, opts
fetch_view name, opts, &block
else
begin
view = fetch_view name, opts.merge({:include_docs => true})
view['rows'].collect{|r|new(r['doc'])}
view = fetch_view name, opts.merge({:include_docs => true}), &block
view['rows'].collect{|r|new(r['doc'])} if view
rescue
# fallback for old versions of couchdb that don't
# have include_docs support
view = fetch_view name, opts
view['rows'].collect{|r|new(database.get(r['id']))}
view = fetch_view name, opts, &block
view['rows'].collect{|r|new(database.get(r['id']))} if view
end
end
end
def fetch_view view_name, opts
def fetch_view view_name, opts, &block
retryable = true
begin
database.view(view_name, opts)
puts "block" if block
database.view(view_name, opts, &block)
# the design doc could have been deleted by a rouge process
rescue RestClient::ResourceNotFound => e
if retryable

View file

@ -6,15 +6,21 @@ module CouchRest
end
# Stream a view, yielding one row at a time. Shells out to <tt>curl</tt> to keep RAM usage low when you have millions of rows.
def view name, params = nil
def view name, params = nil, &block
urlst = /^_/.match(name) ? "#{@db.root}/#{name}" : "#{@db.root}/_view/#{name}"
url = CouchRest.paramify_url urlst, params
first = nil
IO.popen("curl --silent #{url}") do |view|
view.gets # discard header
while row = parse_line(view.gets)
yield row
first = view.gets # discard header
# puts first
while line = view.gets
# puts line
row = parse_line(line)
block.call row
end
end
# parse_line(line)
first
end
private